beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [1/4] incubator-beam git commit: [BEAM-158] add support for bounded sources in streaming
Date Mon, 18 Apr 2016 14:38:13 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 7646384e2 -> 56e28a90f


[BEAM-158] add support for bounded sources in streaming


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/444a0bbb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/444a0bbb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/444a0bbb

Branch: refs/heads/master
Commit: 444a0bbba51a598689b7e6ccf11de5f6f23d5211
Parents: 7c4f2dc
Author: Maximilian Michels <mxm@apache.org>
Authored: Thu Mar 31 10:18:01 2016 +0200
Committer: Maximilian Michels <mxm@apache.org>
Committed: Mon Apr 18 16:36:43 2016 +0200

----------------------------------------------------------------------
 .../FlinkStreamingTransformTranslators.java     | 46 +++++++++++-
 .../streaming/io/UnboundedSourceWrapper.java    |  4 +-
 .../beam/runners/flink/ReadSourceITCase.java    |  9 ++-
 .../flink/ReadSourceStreamingITCase.java        | 74 ++++++++++++++++++++
 4 files changed, 124 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/444a0bbb/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index a1e9f47..927c3a2 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.flink.translation;
 
 import org.apache.beam.runners.flink.translation.functions.UnionCoder;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkGroupAlsoByWindowWrapper;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkGroupByKeyWrapper;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkParDoBoundMultiWrapper;
@@ -29,6 +30,7 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.io.Unbounded
 import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.transforms.Combine;
@@ -62,8 +64,12 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
+import org.apache.flink.streaming.api.functions.TimestampAssigner;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.util.Collector;
+import org.apache.kafka.common.utils.Time;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -94,6 +100,7 @@ public class FlinkStreamingTransformTranslators {
   // here you can find all the available translators.
   static {
     TRANSLATORS.put(Create.Values.class, new CreateStreamingTranslator());
+    TRANSLATORS.put(Read.Bounded.class, new BoundedReadSourceTranslator());
     TRANSLATORS.put(Read.Unbounded.class, new UnboundedReadSourceTranslator());
     TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundStreamingTranslator());
     TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator());
@@ -125,7 +132,7 @@ public class FlinkStreamingTransformTranslators {
       // in the FlatMap function using the Coder.
 
       List<byte[]> serializedElements = Lists.newArrayList();
-      Coder<OUT> elementCoder = context.getOutput(transform).getCoder();
+      Coder<OUT> elementCoder = output.getCoder();
       for (OUT element: elements) {
         ByteArrayOutputStream bao = new ByteArrayOutputStream();
         try {
@@ -148,7 +155,7 @@ public class FlinkStreamingTransformTranslators {
       DataStream<WindowedValue<OUT>> outputDataStream = initDataSet.flatMap(createFunction)
           .returns(outputType);
 
-      context.setOutputDataStream(context.getOutput(transform), outputDataStream);
+      context.setOutputDataStream(output, outputDataStream);
     }
   }
 
@@ -186,6 +193,41 @@ public class FlinkStreamingTransformTranslators {
     }
   }
 
+  private static class BoundedReadSourceTranslator<T>
+      implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Bounded<T>>
{
+
+    @Override
+    public void translateNode(Read.Bounded<T> transform, FlinkStreamingTranslationContext
context) {
+
+      BoundedSource<T> boundedSource = transform.getSource();
+      PCollection<T> output = context.getOutput(transform);
+
+      Coder<T> defaultOutputCoder = boundedSource.getDefaultOutputCoder();
+      CoderTypeInformation<T> typeInfo = new CoderTypeInformation<>(defaultOutputCoder);
+
+      DataStream<T> source = context.getExecutionEnvironment().createInput(
+          new SourceInputFormat<>(
+              boundedSource,
+              context.getPipelineOptions()),
+          typeInfo);
+
+      DataStream<WindowedValue<T>> windowedStream = source.flatMap(
+          new FlatMapFunction<T, WindowedValue<T>>() {
+            @Override
+            public void flatMap(T value, Collector<WindowedValue<T>> out) throws
Exception {
+              out.collect(
+                  WindowedValue.of(value,
+                    Instant.now(),
+                    GlobalWindow.INSTANCE,
+                    PaneInfo.NO_FIRING));
+            }
+          })
+          .assignTimestampsAndWatermarks(new IngestionTimeExtractor<WindowedValue<T>>());
+
+      context.setOutputDataStream(output, windowedStream);
+    }
+  }
+
   private static class UnboundedReadSourceTranslator<T> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Unbounded<T>>
{
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/444a0bbb/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index dcc7967..5be34e6 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -147,12 +147,12 @@ public class UnboundedSourceWrapper<T> extends RichSourceFunction<WindowedValue<
   private void setNextWatermarkTimer(StreamingRuntimeContext runtime) {
     if (this.isRunning) {
       long watermarkInterval =  runtime.getExecutionConfig().getAutoWatermarkInterval();
-      long timeToNextWatermark = getTimeToNextWaternark(watermarkInterval);
+      long timeToNextWatermark = getTimeToNextWatermark(watermarkInterval);
       runtime.registerTimer(timeToNextWatermark, this);
     }
   }
 
-  private long getTimeToNextWaternark(long watermarkInterval) {
+  private long getTimeToNextWatermark(long watermarkInterval) {
     return System.currentTimeMillis() + watermarkInterval;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/444a0bbb/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
index 4f63925..66c959e 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
@@ -18,10 +18,7 @@
 package org.apache.beam.runners.flink;
 
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.CountingInput;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -31,6 +28,9 @@ import com.google.common.base.Joiner;
 
 import org.apache.flink.test.util.JavaProgramTestBase;
 
+/**
+ * Reads from a bounded source in batch execution.
+ */
 public class ReadSourceITCase extends JavaProgramTestBase {
 
   protected String resultPath;
@@ -44,7 +44,6 @@ public class ReadSourceITCase extends JavaProgramTestBase {
   @Override
   protected void preSubmit() throws Exception {
     resultPath = getTempDirPath("result");
-    System.out.println(resultPath);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/444a0bbb/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
new file mode 100644
index 0000000..fe71802
--- /dev/null
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import com.google.common.base.Joiner;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+
+/**
+ * Reads from a bounded source in streaming.
+ */
+public class ReadSourceStreamingITCase extends StreamingProgramTestBase {
+
+  protected String resultPath;
+
+  public ReadSourceStreamingITCase(){
+  }
+
+  private static final String[] EXPECTED_RESULT = new String[] {
+     "0", "1", "2", "3", "4", "5", "6", "7", "8", "9"};
+
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+  }
+
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
+  }
+
+  @Override
+  protected void testProgram() throws Exception {
+    runProgram(resultPath);
+  }
+
+  private static void runProgram(String resultPath) {
+
+    Pipeline p = FlinkTestPipeline.createForStreaming();
+
+    p
+      .apply(CountingInput.upTo(10))
+      .apply(ParDo.of(new DoFn<Long, String>() {
+          @Override
+          public void processElement(ProcessContext c) throws Exception {
+            c.output(c.element().toString());
+          }
+        }))
+      .apply(TextIO.Write.to(resultPath));
+
+    p.run();
+  }
+}
+
+


Mime
View raw message