beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [2/3] beam git commit: [BEAM-1886] Remove TextIO override in Flink runner
Date Thu, 20 Apr 2017 16:01:46 GMT
[BEAM-1886] Remove TextIO override in Flink runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0c030d49
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0c030d49
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0c030d49

Branch: refs/heads/master
Commit: 0c030d49a79ce20e25a8af3110442130dab8899c
Parents: 4121ec4
Author: JingsongLi <lzljs3620320@aliyun.com>
Authored: Fri Apr 14 11:22:43 2017 +0800
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Thu Apr 20 18:01:19 2017 +0200

----------------------------------------------------------------------
 .../FlinkStreamingTransformTranslators.java     | 55 --------------------
 .../flink/streaming/GroupByNullKeyTest.java     | 11 ++--
 2 files changed, 8 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0c030d49/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 123d5e7..71f315d 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -50,7 +50,6 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
@@ -80,10 +79,8 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -92,8 +89,6 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
 import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * This class contains all the mappings between Beam and Flink
@@ -116,7 +111,6 @@ class FlinkStreamingTransformTranslators {
   static {
     TRANSLATORS.put(Read.Bounded.class, new BoundedReadSourceTranslator());
     TRANSLATORS.put(Read.Unbounded.class, new UnboundedReadSourceTranslator());
-    TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator());
 
     TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoStreamingTranslator());
     TRANSLATORS.put(
@@ -145,55 +139,6 @@ class FlinkStreamingTransformTranslators {
   //  Transformation Implementations
   // --------------------------------------------------------------------------------------------
 
-  private static class TextIOWriteBoundStreamingTranslator
-      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<TextIO.Write.Bound>
{
-
-    private static final Logger LOG =
-        LoggerFactory.getLogger(TextIOWriteBoundStreamingTranslator.class);
-
-    @Override
-    public void translateNode(
-        TextIO.Write.Bound transform,
-        FlinkStreamingTranslationContext context) {
-      PValue input = context.getInput(transform);
-      DataStream<WindowedValue<String>> inputDataStream = context.getInputDataStream(input);
-
-      String filenamePrefix = transform.getFilenamePrefix();
-      String filenameSuffix = transform.getFilenameSuffix();
-      boolean needsValidation = transform.needsValidation();
-      int numShards = transform.getNumShards();
-      String shardNameTemplate = transform.getShardNameTemplate();
-
-      // TODO: Implement these. We need Flink support for this.
-      LOG.warn(
-          "Translation of TextIO.Write.needsValidation not yet supported. Is: {}.",
-          needsValidation);
-      LOG.warn(
-          "Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.",
-          filenameSuffix);
-      LOG.warn(
-          "Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.",
-          shardNameTemplate);
-
-      DataStream<String> dataSink = inputDataStream
-          .flatMap(new FlatMapFunction<WindowedValue<String>, String>() {
-            @Override
-            public void flatMap(
-                WindowedValue<String> value,
-                Collector<String> out)
-                throws Exception {
-              out.collect(value.getValue());
-            }
-          });
-      DataStreamSink<String> output =
-          dataSink.writeAsText(filenamePrefix, FileSystem.WriteMode.OVERWRITE);
-
-      if (numShards > 0) {
-        output.setParallelism(numShards);
-      }
-    }
-  }
-
   private static class UnboundedReadSourceTranslator<T>
       extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Unbounded<T>>
{
 

http://git-wip-us.apache.org/repos/asf/beam/blob/0c030d49/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
index 663b910..82d9f4f 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.flink.streaming;
 
 import com.google.common.base.Joiner;
+import java.io.File;
 import java.io.Serializable;
 import java.util.Arrays;
 import org.apache.beam.runners.flink.FlinkTestPipeline;
@@ -41,7 +42,7 @@ import org.joda.time.Instant;
  */
 public class GroupByNullKeyTest extends StreamingProgramTestBase implements Serializable
{
 
-
+  protected String resultDir;
   protected String resultPath;
 
   static final String[] EXPECTED_RESULT = new String[] {
@@ -53,12 +54,16 @@ public class GroupByNullKeyTest extends StreamingProgramTestBase implements
Seri
 
   @Override
   protected void preSubmit() throws Exception {
-    resultPath = getTempDirPath("result");
+    // Beam Write will add shard suffix to fileName, see ShardNameTemplate.
+    // So tempFile need have a parent to compare.
+    File resultParent = createAndRegisterTempFile("result");
+    resultDir = resultParent.toURI().toString();
+    resultPath = new File(resultParent, "file.txt").getAbsolutePath();
   }
 
   @Override
   protected void postSubmit() throws Exception {
-    compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
+    compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultDir);
   }
 
   /**


Mime
View raw message