Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 886CB200C23 for ; Wed, 22 Feb 2017 18:29:22 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 8710E160B62; Wed, 22 Feb 2017 17:29:22 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id EA299160B72 for ; Wed, 22 Feb 2017 18:29:19 +0100 (CET) Received: (qmail 43916 invoked by uid 500); 22 Feb 2017 17:29:19 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 43802 invoked by uid 99); 22 Feb 2017 17:29:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 22 Feb 2017 17:29:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EDE8FDFEF5; Wed, 22 Feb 2017 17:29:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aljoscha@apache.org To: commits@beam.apache.org Date: Wed, 22 Feb 2017 17:29:20 -0000 Message-Id: <787b24cad11446bba39710889b5d5809@git.apache.org> In-Reply-To: <9136e823c0fc49f4833c144aaaaff687@git.apache.org> References: <9136e823c0fc49f4833c144aaaaff687@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/6] beam git commit: Move Flink Runner classes to base package and make package private archived-at: Wed, 22 Feb 2017 17:29:22 -0000 http://git-wip-us.apache.org/repos/asf/beam/blob/e0e7e52b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java new file mode 100644 index 0000000..eaab3d1 --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -0,0 +1,1043 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.runners.core.SystemReduceFn; +import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; +import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; +import org.apache.beam.runners.flink.translation.types.FlinkCoder; +import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator; +import org.apache.beam.runners.flink.translation.wrappers.streaming.KvToByteBufferKeySelector; +import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItem; +import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItemCoder; +import org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator; +import org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.BoundedSourceWrapper; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSink; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.Sink; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.Write; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.join.RawUnionValue; +import org.apache.beam.sdk.transforms.join.UnionCoder; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.AppliedCombineFn; +import org.apache.beam.sdk.util.Reshuffle; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TaggedPValue; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.streaming.api.collector.selector.OutputSelector; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.datastream.SplitStream; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.transformations.TwoInputTransformation; +import org.apache.flink.util.Collector; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class contains all the mappings between Beam and Flink + * streaming transformations. The {@link FlinkStreamingPipelineTranslator} + * traverses the Beam job and comes here to translate the encountered Beam transformations + * into Flink one, based on the mapping available in this class. + */ +class FlinkStreamingTransformTranslators { + + // -------------------------------------------------------------------------------------------- + // Transform Translator Registry + // -------------------------------------------------------------------------------------------- + + @SuppressWarnings("rawtypes") + private static final Map< + Class, + FlinkStreamingPipelineTranslator.StreamTransformTranslator> TRANSLATORS = new HashMap<>(); + + // here you can find all the available translators. + static { + TRANSLATORS.put(Read.Bounded.class, new BoundedReadSourceTranslator()); + TRANSLATORS.put(Read.Unbounded.class, new UnboundedReadSourceTranslator()); + TRANSLATORS.put(Write.Bound.class, new WriteSinkStreamingTranslator()); + TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator()); + + TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundStreamingTranslator()); + TRANSLATORS.put(ParDo.BoundMulti.class, new ParDoBoundMultiStreamingTranslator()); + + TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslator()); + TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new FlattenPCollectionTranslator()); + TRANSLATORS.put( + FlinkStreamingViewOverrides.CreateFlinkPCollectionView.class, + new CreateViewStreamingTranslator()); + + TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslatorStreaming()); + TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator()); + TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslator()); + } + + public static FlinkStreamingPipelineTranslator.StreamTransformTranslator getTranslator( + PTransform transform) { + return TRANSLATORS.get(transform.getClass()); + } + + // -------------------------------------------------------------------------------------------- + // Transformation Implementations + // -------------------------------------------------------------------------------------------- + + private static class TextIOWriteBoundStreamingTranslator + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator { + + private static final Logger LOG = + LoggerFactory.getLogger(TextIOWriteBoundStreamingTranslator.class); + + @Override + public void translateNode( + TextIO.Write.Bound transform, + FlinkStreamingTranslationContext context) { + PValue input = context.getInput(transform); + DataStream> inputDataStream = context.getInputDataStream(input); + + String filenamePrefix = transform.getFilenamePrefix(); + String filenameSuffix = transform.getFilenameSuffix(); + boolean needsValidation = transform.needsValidation(); + int numShards = transform.getNumShards(); + String shardNameTemplate = transform.getShardNameTemplate(); + + // TODO: Implement these. We need Flink support for this. + LOG.warn( + "Translation of TextIO.Write.needsValidation not yet supported. Is: {}.", + needsValidation); + LOG.warn( + "Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.", + filenameSuffix); + LOG.warn( + "Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.", + shardNameTemplate); + + DataStream dataSink = inputDataStream + .flatMap(new FlatMapFunction, String>() { + @Override + public void flatMap( + WindowedValue value, + Collector out) + throws Exception { + out.collect(value.getValue()); + } + }); + DataStreamSink output = + dataSink.writeAsText(filenamePrefix, FileSystem.WriteMode.OVERWRITE); + + if (numShards > 0) { + output.setParallelism(numShards); + } + } + } + + private static class WriteSinkStreamingTranslator + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator> { + + @Override + public void translateNode(Write.Bound transform, FlinkStreamingTranslationContext context) { + String name = transform.getName(); + PValue input = context.getInput(transform); + + Sink sink = transform.getSink(); + if (!(sink instanceof UnboundedFlinkSink)) { + throw new UnsupportedOperationException( + "At the time, only unbounded Flink sinks are supported."); + } + + DataStream> inputDataSet = context.getInputDataStream(input); + + inputDataSet.flatMap(new FlatMapFunction, Object>() { + @Override + public void flatMap(WindowedValue value, Collector out) throws Exception { + out.collect(value.getValue()); + } + }).addSink(((UnboundedFlinkSink) sink).getFlinkSource()).name(name); + } + } + + private static class UnboundedReadSourceTranslator + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator> { + + @Override + public void translateNode( + Read.Unbounded transform, + FlinkStreamingTranslationContext context) { + PCollection output = context.getOutput(transform); + + TypeInformation> outputTypeInfo = + context.getTypeInfo(context.getOutput(transform)); + + DataStream> source; + if (transform.getSource().getClass().equals(UnboundedFlinkSource.class)) { + @SuppressWarnings("unchecked") + UnboundedFlinkSource flinkSourceFunction = + (UnboundedFlinkSource) transform.getSource(); + + final AssignerWithPeriodicWatermarks flinkAssigner = + flinkSourceFunction.getFlinkTimestampAssigner(); + + DataStream flinkSource = context.getExecutionEnvironment() + .addSource(flinkSourceFunction.getFlinkSource()); + + flinkSourceFunction.setCoder( + new FlinkCoder(flinkSource.getType(), + context.getExecutionEnvironment().getConfig())); + + source = flinkSource + .assignTimestampsAndWatermarks(flinkAssigner) + .flatMap(new FlatMapFunction>() { + @Override + public void flatMap(T s, Collector> collector) throws Exception { + collector.collect( + WindowedValue.of( + s, + new Instant(flinkAssigner.extractTimestamp(s, -1)), + GlobalWindow.INSTANCE, + PaneInfo.NO_FIRING)); + }}).returns(outputTypeInfo); + } else { + try { + UnboundedSourceWrapper sourceWrapper = + new UnboundedSourceWrapper<>( + context.getPipelineOptions(), + transform.getSource(), + context.getExecutionEnvironment().getParallelism()); + source = context + .getExecutionEnvironment() + .addSource(sourceWrapper).name(transform.getName()).returns(outputTypeInfo); + } catch (Exception e) { + throw new RuntimeException( + "Error while translating UnboundedSource: " + transform.getSource(), e); + } + } + + context.setOutputDataStream(output, source); + } + } + + private static class BoundedReadSourceTranslator + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator> { + + @Override + public void translateNode( + Read.Bounded transform, + FlinkStreamingTranslationContext context) { + PCollection output = context.getOutput(transform); + + TypeInformation> outputTypeInfo = + context.getTypeInfo(context.getOutput(transform)); + + + DataStream> source; + try { + BoundedSourceWrapper sourceWrapper = + new BoundedSourceWrapper<>( + context.getPipelineOptions(), + transform.getSource(), + context.getExecutionEnvironment().getParallelism()); + source = context + .getExecutionEnvironment() + .addSource(sourceWrapper).name(transform.getName()).returns(outputTypeInfo); + } catch (Exception e) { + throw new RuntimeException( + "Error while translating BoundedSource: " + transform.getSource(), e); + } + + context.setOutputDataStream(output, source); + } + } + + private static void rejectSplittable(DoFn doFn) { + DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass()); + if (signature.processElement().isSplittable()) { + throw new UnsupportedOperationException( + String.format( + "%s does not currently support splittable DoFn: %s", + FlinkRunner.class.getSimpleName(), doFn)); + } + } + + private static class ParDoBoundStreamingTranslator + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< + ParDo.Bound> { + + @Override + public void translateNode( + ParDo.Bound transform, + FlinkStreamingTranslationContext context) { + + DoFn doFn = transform.getFn(); + rejectSplittable(doFn); + + WindowingStrategy windowingStrategy = + context.getOutput(transform).getWindowingStrategy(); + + TypeInformation> typeInfo = + context.getTypeInfo(context.getOutput(transform)); + + List> sideInputs = transform.getSideInputs(); + + @SuppressWarnings("unchecked") + PCollection inputPCollection = (PCollection) context.getInput(transform); + + Coder> inputCoder = context.getCoder(inputPCollection); + + DataStream> inputDataStream = + context.getInputDataStream(context.getInput(transform)); + Coder keyCoder = null; + boolean stateful = false; + DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass()); + if (signature.stateDeclarations().size() > 0 + || signature.timerDeclarations().size() > 0) { + // Based on the fact that the signature is stateful, DoFnSignatures ensures + // that it is also keyed + keyCoder = ((KvCoder) inputPCollection.getCoder()).getKeyCoder(); + inputDataStream = inputDataStream.keyBy(new KvToByteBufferKeySelector(keyCoder)); + stateful = true; + } + + if (sideInputs.isEmpty()) { + DoFnOperator> doFnOperator = + new DoFnOperator<>( + transform.getFn(), + inputCoder, + new TupleTag("main output"), + Collections.>emptyList(), + new DoFnOperator.DefaultOutputManagerFactory>(), + windowingStrategy, + new HashMap>(), /* side-input mapping */ + Collections.>emptyList(), /* side inputs */ + context.getPipelineOptions(), + keyCoder); + + SingleOutputStreamOperator> outDataStream = inputDataStream + .transform(transform.getName(), typeInfo, doFnOperator); + + context.setOutputDataStream(context.getOutput(transform), outDataStream); + } else { + Tuple2>, DataStream> transformedSideInputs = + transformSideInputs(sideInputs, context); + + DoFnOperator> doFnOperator = + new DoFnOperator<>( + transform.getFn(), + inputCoder, + new TupleTag("main output"), + Collections.>emptyList(), + new DoFnOperator.DefaultOutputManagerFactory>(), + windowingStrategy, + transformedSideInputs.f0, + sideInputs, + context.getPipelineOptions(), + keyCoder); + + SingleOutputStreamOperator> outDataStream; + if (stateful) { + // we have to manually contruct the two-input transform because we're not + // allowed to have only one input keyed, normally. + KeyedStream keyedStream = (KeyedStream) inputDataStream; + TwoInputTransformation< + WindowedValue>, + RawUnionValue, + WindowedValue> rawFlinkTransform = new TwoInputTransformation<>( + keyedStream.getTransformation(), + transformedSideInputs.f1.broadcast().getTransformation(), + transform.getName(), + (TwoInputStreamOperator) doFnOperator, + typeInfo, + keyedStream.getParallelism()); + + rawFlinkTransform.setStateKeyType(keyedStream.getKeyType()); + rawFlinkTransform.setStateKeySelectors(keyedStream.getKeySelector(), null); + + outDataStream = new SingleOutputStreamOperator( + keyedStream.getExecutionEnvironment(), + rawFlinkTransform) {}; // we have to cheat around the ctor being protected + + keyedStream.getExecutionEnvironment().addOperator(rawFlinkTransform); + } else { + outDataStream = inputDataStream + .connect(transformedSideInputs.f1.broadcast()) + .transform(transform.getName(), typeInfo, doFnOperator); + } + context.setOutputDataStream(context.getOutput(transform), outDataStream); + } + } + } + + /** + * Wraps each element in a {@link RawUnionValue} with the given tag id. + */ + private static class ToRawUnion implements MapFunction { + private final int intTag; + + public ToRawUnion(int intTag) { + this.intTag = intTag; + } + + @Override + public RawUnionValue map(T o) throws Exception { + return new RawUnionValue(intTag, o); + } + } + + private static Tuple2>, DataStream> + transformSideInputs( + Collection> sideInputs, + FlinkStreamingTranslationContext context) { + + // collect all side inputs + Map, Integer> tagToIntMapping = new HashMap<>(); + Map> intToViewMapping = new HashMap<>(); + int count = 0; + for (PCollectionView sideInput: sideInputs) { + TupleTag tag = sideInput.getTagInternal(); + intToViewMapping.put(count, sideInput); + tagToIntMapping.put(tag, count); + count++; + Coder>> coder = sideInput.getCoderInternal(); + } + + + List> inputCoders = new ArrayList<>(); + for (PCollectionView sideInput: sideInputs) { + DataStream sideInputStream = context.getInputDataStream(sideInput); + TypeInformation tpe = sideInputStream.getType(); + if (!(tpe instanceof CoderTypeInformation)) { + throw new IllegalStateException( + "Input Stream TypeInformation is no CoderTypeInformation."); + } + + Coder coder = ((CoderTypeInformation) tpe).getCoder(); + inputCoders.add(coder); + } + + UnionCoder unionCoder = UnionCoder.of(inputCoders); + + CoderTypeInformation unionTypeInformation = + new CoderTypeInformation<>(unionCoder); + + // transform each side input to RawUnionValue and union them + DataStream sideInputUnion = null; + + for (PCollectionView sideInput: sideInputs) { + TupleTag tag = sideInput.getTagInternal(); + final int intTag = tagToIntMapping.get(tag); + DataStream sideInputStream = context.getInputDataStream(sideInput); + DataStream unionValueStream = + sideInputStream.map(new ToRawUnion<>(intTag)).returns(unionTypeInformation); + + if (sideInputUnion == null) { + sideInputUnion = unionValueStream; + } else { + sideInputUnion = sideInputUnion.union(unionValueStream); + } + } + + if (sideInputUnion == null) { + throw new IllegalStateException("No unioned side inputs, this indicates a bug."); + } + + return new Tuple2<>(intToViewMapping, sideInputUnion); + } + + + private static class ParDoBoundMultiStreamingTranslator + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< + ParDo.BoundMulti> { + + @Override + public void translateNode( + ParDo.BoundMulti transform, + FlinkStreamingTranslationContext context) { + + DoFn doFn = transform.getFn(); + rejectSplittable(doFn); + + // we assume that the transformation does not change the windowing strategy. + WindowingStrategy windowingStrategy = + context.getInput(transform).getWindowingStrategy(); + + List outputs = context.getOutputs(transform); + + Map, Integer> tagsToLabels = + transformTupleTagsToLabels(transform.getMainOutputTag(), outputs); + + List> sideInputs = transform.getSideInputs(); + + SingleOutputStreamOperator unionOutputStream; + + @SuppressWarnings("unchecked") + PCollection inputPCollection = (PCollection) context.getInput(transform); + + Coder> inputCoder = context.getCoder(inputPCollection); + + DataStream> inputDataStream = + context.getInputDataStream(context.getInput(transform)); + Coder keyCoder = null; + boolean stateful = false; + DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass()); + if (signature.stateDeclarations().size() > 0 + || signature.timerDeclarations().size() > 0) { + // Based on the fact that the signature is stateful, DoFnSignatures ensures + // that it is also keyed + keyCoder = ((KvCoder) inputPCollection.getCoder()).getKeyCoder(); + inputDataStream = inputDataStream.keyBy(new KvToByteBufferKeySelector(keyCoder)); + stateful = true; + } + + if (sideInputs.isEmpty()) { + DoFnOperator doFnOperator = + new DoFnOperator<>( + transform.getFn(), + inputCoder, + transform.getMainOutputTag(), + transform.getSideOutputTags().getAll(), + new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels), + windowingStrategy, + new HashMap>(), /* side-input mapping */ + Collections.>emptyList(), /* side inputs */ + context.getPipelineOptions(), + keyCoder); + + UnionCoder outputUnionCoder = createUnionCoder(outputs); + + CoderTypeInformation outputUnionTypeInformation = + new CoderTypeInformation<>(outputUnionCoder); + + unionOutputStream = inputDataStream + .transform(transform.getName(), outputUnionTypeInformation, doFnOperator); + + } else { + Tuple2>, DataStream> transformedSideInputs = + transformSideInputs(sideInputs, context); + + DoFnOperator doFnOperator = + new DoFnOperator<>( + transform.getFn(), + inputCoder, + transform.getMainOutputTag(), + transform.getSideOutputTags().getAll(), + new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels), + windowingStrategy, + transformedSideInputs.f0, + sideInputs, + context.getPipelineOptions(), + keyCoder); + + UnionCoder outputUnionCoder = createUnionCoder(outputs); + + CoderTypeInformation outputUnionTypeInformation = + new CoderTypeInformation<>(outputUnionCoder); + + if (stateful) { + // we have to manually contruct the two-input transform because we're not + // allowed to have only one input keyed, normally. + KeyedStream keyedStream = (KeyedStream) inputDataStream; + TwoInputTransformation< + WindowedValue>, + RawUnionValue, + WindowedValue> rawFlinkTransform = new TwoInputTransformation( + keyedStream.getTransformation(), + transformedSideInputs.f1.broadcast().getTransformation(), + transform.getName(), + (TwoInputStreamOperator) doFnOperator, + outputUnionTypeInformation, + keyedStream.getParallelism()); + + rawFlinkTransform.setStateKeyType(keyedStream.getKeyType()); + rawFlinkTransform.setStateKeySelectors(keyedStream.getKeySelector(), null); + + unionOutputStream = new SingleOutputStreamOperator( + keyedStream.getExecutionEnvironment(), + rawFlinkTransform) {}; // we have to cheat around the ctor being protected + + keyedStream.getExecutionEnvironment().addOperator(rawFlinkTransform); + + } else { + unionOutputStream = inputDataStream + .connect(transformedSideInputs.f1.broadcast()) + .transform(transform.getName(), outputUnionTypeInformation, doFnOperator); + } + } + + SplitStream splitStream = unionOutputStream + .split(new OutputSelector() { + @Override + public Iterable select(RawUnionValue value) { + return Collections.singletonList(Integer.toString(value.getUnionTag())); + } + }); + + for (TaggedPValue output : outputs) { + final int outputTag = tagsToLabels.get(output.getTag()); + + TypeInformation outputTypeInfo = context.getTypeInfo((PCollection) output.getValue()); + + @SuppressWarnings("unchecked") + DataStream unwrapped = splitStream.select(String.valueOf(outputTag)) + .flatMap(new FlatMapFunction() { + @Override + public void flatMap(RawUnionValue value, Collector out) throws Exception { + out.collect(value.getValue()); + } + }).returns(outputTypeInfo); + + context.setOutputDataStream(output.getValue(), unwrapped); + } + } + + private Map, Integer> transformTupleTagsToLabels( + TupleTag mainTag, + List allTaggedValues) { + + Map, Integer> tagToLabelMap = Maps.newHashMap(); + int count = 0; + tagToLabelMap.put(mainTag, count++); + for (TaggedPValue taggedPValue : allTaggedValues) { + if (!tagToLabelMap.containsKey(taggedPValue.getTag())) { + tagToLabelMap.put(taggedPValue.getTag(), count++); + } + } + return tagToLabelMap; + } + + private UnionCoder createUnionCoder(Collection taggedCollections) { + List> outputCoders = Lists.newArrayList(); + for (TaggedPValue taggedColl : taggedCollections) { + checkArgument( + taggedColl.getValue() instanceof PCollection, + "A Union Coder can only be created for a Collection of Tagged %s. Got %s", + PCollection.class.getSimpleName(), + taggedColl.getValue().getClass().getSimpleName()); + PCollection coll = (PCollection) taggedColl.getValue(); + WindowedValue.FullWindowedValueCoder windowedValueCoder = + WindowedValue.getFullCoder( + coll.getCoder(), + coll.getWindowingStrategy().getWindowFn().windowCoder()); + outputCoders.add(windowedValueCoder); + } + return UnionCoder.of(outputCoders); + } + } + + private static class CreateViewStreamingTranslator + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< + FlinkStreamingViewOverrides.CreateFlinkPCollectionView> { + + @Override + public void translateNode( + FlinkStreamingViewOverrides.CreateFlinkPCollectionView transform, + FlinkStreamingTranslationContext context) { + // just forward + DataStream>> inputDataSet = + context.getInputDataStream(context.getInput(transform)); + + PCollectionView view = context.getOutput(transform); + + context.setOutputDataStream(view, inputDataSet); + } + } + + private static class WindowBoundTranslator + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator> { + + @Override + public void translateNode( + Window.Bound transform, + FlinkStreamingTranslationContext context) { + + @SuppressWarnings("unchecked") + WindowingStrategy windowingStrategy = + (WindowingStrategy) + context.getOutput(transform).getWindowingStrategy(); + + TypeInformation> typeInfo = + context.getTypeInfo(context.getOutput(transform)); + + DataStream> inputDataStream = + context.getInputDataStream(context.getInput(transform)); + + WindowFn windowFn = windowingStrategy.getWindowFn(); + + FlinkAssignWindows assignWindowsFunction = + new FlinkAssignWindows<>(windowFn); + + SingleOutputStreamOperator> outputDataStream = inputDataStream + .flatMap(assignWindowsFunction) + .name(context.getOutput(transform).getName()) + .returns(typeInfo); + + context.setOutputDataStream(context.getOutput(transform), outputDataStream); + } + } + + private static class ReshuffleTranslatorStreaming + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator> { + + @Override + public void translateNode( + Reshuffle transform, + FlinkStreamingTranslationContext context) { + + DataStream>> inputDataSet = + context.getInputDataStream(context.getInput(transform)); + + context.setOutputDataStream(context.getOutput(transform), inputDataSet.rebalance()); + + } + } + + + private static class GroupByKeyTranslator + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator> { + + @Override + public void translateNode( + GroupByKey transform, + FlinkStreamingTranslationContext context) { + + PCollection> input = context.getInput(transform); + + @SuppressWarnings("unchecked") + WindowingStrategy windowingStrategy = + (WindowingStrategy) input.getWindowingStrategy(); + + KvCoder inputKvCoder = (KvCoder) input.getCoder(); + + SingletonKeyedWorkItemCoder workItemCoder = SingletonKeyedWorkItemCoder.of( + inputKvCoder.getKeyCoder(), + inputKvCoder.getValueCoder(), + input.getWindowingStrategy().getWindowFn().windowCoder()); + + DataStream>> inputDataStream = context.getInputDataStream(input); + + WindowedValue. + FullWindowedValueCoder> windowedWorkItemCoder = + WindowedValue.getFullCoder( + workItemCoder, + input.getWindowingStrategy().getWindowFn().windowCoder()); + + CoderTypeInformation>> workItemTypeInfo = + new CoderTypeInformation<>(windowedWorkItemCoder); + + DataStream>> workItemStream = + inputDataStream + .flatMap(new CombinePerKeyTranslator.ToKeyedWorkItem()) + .returns(workItemTypeInfo).name("ToKeyedWorkItem"); + + KeyedStream< + WindowedValue< + SingletonKeyedWorkItem>, ByteBuffer> keyedWorkItemStream = workItemStream + .keyBy(new WorkItemKeySelector(inputKvCoder.getKeyCoder())); + + SystemReduceFn, Iterable, BoundedWindow> reduceFn = + SystemReduceFn.buffering(inputKvCoder.getValueCoder()); + + TypeInformation>>> outputTypeInfo = + context.getTypeInfo(context.getOutput(transform)); + + DoFnOperator.DefaultOutputManagerFactory< + WindowedValue>>> outputManagerFactory = + new DoFnOperator.DefaultOutputManagerFactory<>(); + + WindowDoFnOperator> doFnOperator = + new WindowDoFnOperator<>( + reduceFn, + (Coder) windowedWorkItemCoder, + new TupleTag>>("main output"), + Collections.>emptyList(), + outputManagerFactory, + windowingStrategy, + new HashMap>(), /* side-input mapping */ + Collections.>emptyList(), /* side inputs */ + context.getPipelineOptions(), + inputKvCoder.getKeyCoder()); + + // our operator excepts WindowedValue while our input stream + // is WindowedValue, which is fine but Java doesn't like it ... + @SuppressWarnings("unchecked") + SingleOutputStreamOperator>>> outDataStream = + keyedWorkItemStream + .transform( + transform.getName(), + outputTypeInfo, + (OneInputStreamOperator) doFnOperator); + + context.setOutputDataStream(context.getOutput(transform), outDataStream); + + } + } + + private static class CombinePerKeyTranslator + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< + Combine.PerKey> { + + @Override + boolean canTranslate( + Combine.PerKey transform, + FlinkStreamingTranslationContext context) { + + // if we have a merging window strategy and side inputs we cannot + // translate as a proper combine. We have to group and then run the combine + // over the final grouped values. + PCollection> input = context.getInput(transform); + + @SuppressWarnings("unchecked") + WindowingStrategy windowingStrategy = + (WindowingStrategy) input.getWindowingStrategy(); + + return windowingStrategy.getWindowFn().isNonMerging() || transform.getSideInputs().isEmpty(); + } + + @Override + public void translateNode( + Combine.PerKey transform, + FlinkStreamingTranslationContext context) { + + PCollection> input = context.getInput(transform); + + @SuppressWarnings("unchecked") + WindowingStrategy windowingStrategy = + (WindowingStrategy) input.getWindowingStrategy(); + + KvCoder inputKvCoder = (KvCoder) input.getCoder(); + + SingletonKeyedWorkItemCoder workItemCoder = SingletonKeyedWorkItemCoder.of( + inputKvCoder.getKeyCoder(), + inputKvCoder.getValueCoder(), + input.getWindowingStrategy().getWindowFn().windowCoder()); + + DataStream>> inputDataStream = context.getInputDataStream(input); + + WindowedValue. + FullWindowedValueCoder> windowedWorkItemCoder = + WindowedValue.getFullCoder( + workItemCoder, + input.getWindowingStrategy().getWindowFn().windowCoder()); + + CoderTypeInformation>> workItemTypeInfo = + new CoderTypeInformation<>(windowedWorkItemCoder); + + DataStream>> workItemStream = + inputDataStream + .flatMap(new ToKeyedWorkItem()) + .returns(workItemTypeInfo).name("ToKeyedWorkItem"); + + KeyedStream< + WindowedValue< + SingletonKeyedWorkItem>, ByteBuffer> keyedWorkItemStream = workItemStream + .keyBy(new WorkItemKeySelector(inputKvCoder.getKeyCoder())); + + SystemReduceFn reduceFn = SystemReduceFn.combining( + inputKvCoder.getKeyCoder(), + AppliedCombineFn.withInputCoder( + transform.getFn(), input.getPipeline().getCoderRegistry(), inputKvCoder)); + + TypeInformation>> outputTypeInfo = + context.getTypeInfo(context.getOutput(transform)); + + List> sideInputs = transform.getSideInputs(); + + if (sideInputs.isEmpty()) { + + WindowDoFnOperator doFnOperator = + new WindowDoFnOperator<>( + reduceFn, + (Coder) windowedWorkItemCoder, + new TupleTag>("main output"), + Collections.>emptyList(), + new DoFnOperator.DefaultOutputManagerFactory>>(), + windowingStrategy, + new HashMap>(), /* side-input mapping */ + Collections.>emptyList(), /* side inputs */ + context.getPipelineOptions(), + inputKvCoder.getKeyCoder()); + + // our operator excepts WindowedValue while our input stream + // is WindowedValue, which is fine but Java doesn't like it ... + @SuppressWarnings("unchecked") + SingleOutputStreamOperator>> outDataStream = + keyedWorkItemStream.transform( + transform.getName(), outputTypeInfo, (OneInputStreamOperator) doFnOperator); + + context.setOutputDataStream(context.getOutput(transform), outDataStream); + } else { + Tuple2>, DataStream> transformSideInputs = + transformSideInputs(sideInputs, context); + + WindowDoFnOperator doFnOperator = + new WindowDoFnOperator<>( + reduceFn, + (Coder) windowedWorkItemCoder, + new TupleTag>("main output"), + Collections.>emptyList(), + new DoFnOperator.DefaultOutputManagerFactory>>(), + windowingStrategy, + transformSideInputs.f0, + sideInputs, + context.getPipelineOptions(), + inputKvCoder.getKeyCoder()); + + // we have to manually contruct the two-input transform because we're not + // allowed to have only one input keyed, normally. + + TwoInputTransformation< + WindowedValue>, + RawUnionValue, + WindowedValue>> rawFlinkTransform = new TwoInputTransformation<>( + keyedWorkItemStream.getTransformation(), + transformSideInputs.f1.broadcast().getTransformation(), + transform.getName(), + (TwoInputStreamOperator) doFnOperator, + outputTypeInfo, + keyedWorkItemStream.getParallelism()); + + rawFlinkTransform.setStateKeyType(keyedWorkItemStream.getKeyType()); + rawFlinkTransform.setStateKeySelectors(keyedWorkItemStream.getKeySelector(), null); + + @SuppressWarnings({ "unchecked", "rawtypes" }) + SingleOutputStreamOperator>> outDataStream = + new SingleOutputStreamOperator( + keyedWorkItemStream.getExecutionEnvironment(), + rawFlinkTransform) {}; // we have to cheat around the ctor being protected + + keyedWorkItemStream.getExecutionEnvironment().addOperator(rawFlinkTransform); + + context.setOutputDataStream(context.getOutput(transform), outDataStream); + } + } + + private static class ToKeyedWorkItem + extends RichFlatMapFunction< + WindowedValue>, + WindowedValue>> { + + @Override + public void flatMap( + WindowedValue> inWithMultipleWindows, + Collector>> out) throws Exception { + + // we need to wrap each one work item per window for now + // since otherwise the PushbackSideInputRunner will not correctly + // determine whether side inputs are ready + for (WindowedValue> in : inWithMultipleWindows.explodeWindows()) { + SingletonKeyedWorkItem workItem = + new SingletonKeyedWorkItem<>( + in.getValue().getKey(), + in.withValue(in.getValue().getValue())); + + in.withValue(workItem); + out.collect(in.withValue(workItem)); + } + } + } + } + + private static class FlattenPCollectionTranslator + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< + Flatten.FlattenPCollectionList> { + + @Override + public void translateNode( + Flatten.FlattenPCollectionList transform, + FlinkStreamingTranslationContext context) { + List allInputs = context.getInputs(transform); + + if (allInputs.isEmpty()) { + + // create an empty dummy source to satisfy downstream operations + // we cannot create an empty source in Flink, therefore we have to + // add the flatMap that simply never forwards the single element + DataStreamSource dummySource = + context.getExecutionEnvironment().fromElements("dummy"); + + DataStream> result = dummySource.flatMap( + new FlatMapFunction>() { + @Override + public void flatMap( + String s, + Collector> collector) throws Exception { + // never return anything + } + }).returns( + new CoderTypeInformation<>( + WindowedValue.getFullCoder( + (Coder) VoidCoder.of(), + GlobalWindow.Coder.INSTANCE))); + context.setOutputDataStream(context.getOutput(transform), result); + + } else { + DataStream result = null; + for (TaggedPValue input : allInputs) { + DataStream current = context.getInputDataStream(input.getValue()); + result = (result == null) ? current : result.union(current); + } + context.setOutputDataStream(context.getOutput(transform), result); + } + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/e0e7e52b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java new file mode 100644 index 0000000..3d5b83f --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.collect.Iterables; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TaggedPValue; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +/** + * Helper for keeping track of which {@link DataStream DataStreams} map + * to which {@link PTransform PTransforms}. + */ +class FlinkStreamingTranslationContext { + + private final StreamExecutionEnvironment env; + private final PipelineOptions options; + + /** + * Keeps a mapping between the output value of the PTransform (in Dataflow) and the + * Flink Operator that produced it, after the translation of the correspondinf PTransform + * to its Flink equivalent. + * */ + private final Map> dataStreams; + + private AppliedPTransform currentTransform; + + public FlinkStreamingTranslationContext(StreamExecutionEnvironment env, PipelineOptions options) { + this.env = checkNotNull(env); + this.options = checkNotNull(options); + this.dataStreams = new HashMap<>(); + } + + public StreamExecutionEnvironment getExecutionEnvironment() { + return env; + } + + public PipelineOptions getPipelineOptions() { + return options; + } + + @SuppressWarnings("unchecked") + public DataStream getInputDataStream(PValue value) { + return (DataStream) dataStreams.get(value); + } + + public void setOutputDataStream(PValue value, DataStream set) { + if (!dataStreams.containsKey(value)) { + dataStreams.put(value, set); + } + } + + /** + * Sets the AppliedPTransform which carries input/output. + * @param currentTransform + */ + public void setCurrentTransform(AppliedPTransform currentTransform) { + this.currentTransform = currentTransform; + } + + public Coder> getCoder(PCollection collection) { + Coder valueCoder = collection.getCoder(); + + return WindowedValue.getFullCoder( + valueCoder, + collection.getWindowingStrategy().getWindowFn().windowCoder()); + } + + @SuppressWarnings("unchecked") + public TypeInformation> getTypeInfo(PCollection collection) { + Coder valueCoder = collection.getCoder(); + WindowedValue.FullWindowedValueCoder windowedValueCoder = + WindowedValue.getFullCoder( + valueCoder, + collection.getWindowingStrategy().getWindowFn().windowCoder()); + + return new CoderTypeInformation<>(windowedValueCoder); + } + + + @SuppressWarnings("unchecked") + public T getInput(PTransform transform) { + return (T) Iterables.getOnlyElement(currentTransform.getInputs()).getValue(); + } + + public List getInputs(PTransform transform) { + return currentTransform.getInputs(); + } + + @SuppressWarnings("unchecked") + public T getOutput(PTransform transform) { + return (T) Iterables.getOnlyElement(currentTransform.getOutputs()).getValue(); + } + + public List getOutputs(PTransform transform) { + return currentTransform.getOutputs(); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/e0e7e52b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java index 0a9df4e..0ff6367 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java @@ -38,7 +38,7 @@ import org.apache.beam.sdk.values.PCollectionView; /** * Flink streaming overrides for various view (side input) transforms. */ -public class FlinkStreamingViewOverrides { +class FlinkStreamingViewOverrides { /** * Specialized implementation for http://git-wip-us.apache.org/repos/asf/beam/blob/e0e7e52b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java new file mode 100644 index 0000000..3acc3ea --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.runners.TransformHierarchy; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Traverses the Pipeline to determine the {@link TranslationMode} for this pipeline. + */ +class PipelineTranslationOptimizer extends FlinkPipelineTranslator { + + private static final Logger LOG = LoggerFactory.getLogger(PipelineTranslationOptimizer.class); + + private TranslationMode translationMode; + + private final FlinkPipelineOptions options; + + public PipelineTranslationOptimizer(TranslationMode defaultMode, FlinkPipelineOptions options) { + this.translationMode = defaultMode; + this.options = options; + } + + public TranslationMode getTranslationMode() { + + // override user-specified translation mode + if (options.isStreaming()) { + return TranslationMode.STREAMING; + } + + return translationMode; + } + + @Override + public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { + return CompositeBehavior.ENTER_TRANSFORM; + } + + @Override + public void leaveCompositeTransform(TransformHierarchy.Node node) {} + + @Override + public void visitPrimitiveTransform(TransformHierarchy.Node node) { + Class transformClass = node.getTransform().getClass(); + if (transformClass == Read.Unbounded.class) { + LOG.info("Found {}. Switching to streaming execution.", transformClass); + translationMode = TranslationMode.STREAMING; + } + } + + @Override + public void visitValue(PValue value, TransformHierarchy.Node producer) {} +} http://git-wip-us.apache.org/repos/asf/beam/blob/e0e7e52b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TranslationMode.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TranslationMode.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TranslationMode.java new file mode 100644 index 0000000..ad54750 --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TranslationMode.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +/** + * The translation mode of the Beam Pipeline. + */ +enum TranslationMode { + + /** Uses the batch mode of Flink. */ + BATCH, + + /** Uses the streaming mode of Flink. */ + STREAMING + +} http://git-wip-us.apache.org/repos/asf/beam/blob/e0e7e52b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java deleted file mode 100644 index 7fb17c8..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.runners.TransformHierarchy; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.io.DiscardingOutputFormat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * {@link Pipeline.PipelineVisitor} for executing a {@link Pipeline} as a - * Flink batch job. - */ -public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator { - - private static final Logger LOG = LoggerFactory.getLogger(FlinkBatchPipelineTranslator.class); - - /** - * The necessary context in the case of a batch job. - */ - private final FlinkBatchTranslationContext batchContext; - - private int depth = 0; - - public FlinkBatchPipelineTranslator(ExecutionEnvironment env, PipelineOptions options) { - this.batchContext = new FlinkBatchTranslationContext(env, options); - } - - @Override - @SuppressWarnings("rawtypes, unchecked") - public void translate(Pipeline pipeline) { - super.translate(pipeline); - - // terminate dangling DataSets - for (DataSet dataSet: batchContext.getDanglingDataSets().values()) { - dataSet.output(new DiscardingOutputFormat()); - } - } - - // -------------------------------------------------------------------------------------------- - // Pipeline Visitor Methods - // -------------------------------------------------------------------------------------------- - - @Override - public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { - LOG.info("{} enterCompositeTransform- {}", genSpaces(this.depth), node.getFullName()); - this.depth++; - - BatchTransformTranslator translator = getTranslator(node); - - if (translator != null) { - applyBatchTransform(node.getTransform(), node, translator); - LOG.info("{} translated- {}", genSpaces(this.depth), node.getFullName()); - return CompositeBehavior.DO_NOT_ENTER_TRANSFORM; - } else { - return CompositeBehavior.ENTER_TRANSFORM; - } - } - - @Override - public void leaveCompositeTransform(TransformHierarchy.Node node) { - this.depth--; - LOG.info("{} leaveCompositeTransform- {}", genSpaces(this.depth), node.getFullName()); - } - - @Override - public void visitPrimitiveTransform(TransformHierarchy.Node node) { - LOG.info("{} visitPrimitiveTransform- {}", genSpaces(this.depth), node.getFullName()); - - // get the transformation corresponding to the node we are - // currently visiting and translate it into its Flink alternative. - PTransform transform = node.getTransform(); - BatchTransformTranslator translator = - FlinkBatchTransformTranslators.getTranslator(transform); - if (translator == null) { - LOG.info(node.getTransform().getClass().toString()); - throw new UnsupportedOperationException("The transform " + transform - + " is currently not supported."); - } - applyBatchTransform(transform, node, translator); - } - - private > void applyBatchTransform( - PTransform transform, - TransformHierarchy.Node node, - BatchTransformTranslator translator) { - - @SuppressWarnings("unchecked") - T typedTransform = (T) transform; - - @SuppressWarnings("unchecked") - BatchTransformTranslator typedTranslator = (BatchTransformTranslator) translator; - - // create the applied PTransform on the batchContext - batchContext.setCurrentTransform(node.toAppliedPTransform()); - typedTranslator.translateNode(typedTransform, batchContext); - } - - /** - * A translator of a {@link PTransform}. - */ - public interface BatchTransformTranslator { - void translateNode(TransformT transform, FlinkBatchTranslationContext context); - } - - /** - * Returns a translator for the given node, if it is possible, otherwise null. - */ - private static BatchTransformTranslator getTranslator(TransformHierarchy.Node node) { - PTransform transform = node.getTransform(); - - // Root of the graph is null - if (transform == null) { - return null; - } - - return FlinkBatchTransformTranslators.getTranslator(transform); - } -}