Return-Path: X-Original-To: apmail-beam-commits-archive@minotaur.apache.org Delivered-To: apmail-beam-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 18AE919532 for ; Tue, 15 Mar 2016 16:07:15 +0000 (UTC) Received: (qmail 65269 invoked by uid 500); 15 Mar 2016 16:07:15 -0000 Delivered-To: apmail-beam-commits-archive@beam.apache.org Received: (qmail 65217 invoked by uid 500); 15 Mar 2016 16:07:15 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 65207 invoked by uid 99); 15 Mar 2016 16:07:14 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 15 Mar 2016 16:07:14 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 74611C12CE for ; Tue, 15 Mar 2016 16:07:14 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.221 X-Spam-Level: X-Spam-Status: No, score=-3.221 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id t422iao-9-PZ for ; Tue, 15 Mar 2016 16:07:00 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id C3BD75FBEA for ; Tue, 15 Mar 2016 16:06:57 +0000 (UTC) Received: (qmail 64584 invoked by uid 99); 15 Mar 2016 16:06:57 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 15 Mar 2016 16:06:57 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 980A8DFA43; Tue, 15 Mar 2016 16:06:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mxm@apache.org To: commits@beam.incubator.apache.org Date: Tue, 15 Mar 2016 16:06:59 -0000 Message-Id: In-Reply-To: <90857e784edf4b17817091c852037577@git.apache.org> References: <90857e784edf4b17817091c852037577@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [04/13] incubator-beam git commit: [flink] restructure and cleanup Maven layout http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java deleted file mode 100644 index fe77e64..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java +++ /dev/null @@ -1,204 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.functions; - -import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.transforms.Aggregator; -import com.google.cloud.dataflow.sdk.transforms.Combine; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; -import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow; -import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; -import com.google.cloud.dataflow.sdk.util.TimerInternals; -import com.google.cloud.dataflow.sdk.util.WindowedValue; -import com.google.cloud.dataflow.sdk.util.WindowingInternals; -import com.google.cloud.dataflow.sdk.util.state.StateInternals; -import com.google.cloud.dataflow.sdk.values.PCollectionView; -import com.google.cloud.dataflow.sdk.values.TupleTag; -import com.google.common.collect.ImmutableList; -import org.apache.flink.api.common.functions.RichMapPartitionFunction; -import org.apache.flink.util.Collector; -import org.joda.time.Instant; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -/** - * Encapsulates a {@link com.google.cloud.dataflow.sdk.transforms.DoFn} - * inside a Flink {@link org.apache.flink.api.common.functions.RichMapPartitionFunction}. - */ -public class FlinkDoFnFunction extends RichMapPartitionFunction { - - private final DoFn doFn; - private transient PipelineOptions options; - - public FlinkDoFnFunction(DoFn doFn, PipelineOptions options) { - this.doFn = doFn; - this.options = options; - } - - private void writeObject(ObjectOutputStream out) - throws IOException, ClassNotFoundException { - out.defaultWriteObject(); - ObjectMapper mapper = new ObjectMapper(); - mapper.writeValue(out, options); - } - - private void readObject(ObjectInputStream in) - throws IOException, ClassNotFoundException { - in.defaultReadObject(); - ObjectMapper mapper = new ObjectMapper(); - options = mapper.readValue(in, PipelineOptions.class); - } - - @Override - public void mapPartition(Iterable values, Collector out) throws Exception { - ProcessContext context = new ProcessContext(doFn, out); - this.doFn.startBundle(context); - for (IN value : values) { - context.inValue = value; - doFn.processElement(context); - } - this.doFn.finishBundle(context); - } - - private class ProcessContext extends DoFn.ProcessContext { - - IN inValue; - Collector outCollector; - - public ProcessContext(DoFn fn, Collector outCollector) { - fn.super(); - super.setupDelegateAggregators(); - this.outCollector = outCollector; - } - - @Override - public IN element() { - return this.inValue; - } - - - @Override - public Instant timestamp() { - return Instant.now(); - } - - @Override - public BoundedWindow window() { - return GlobalWindow.INSTANCE; - } - - @Override - public PaneInfo pane() { - return PaneInfo.NO_FIRING; - } - - @Override - public WindowingInternals windowingInternals() { - return new WindowingInternals() { - @Override - public StateInternals stateInternals() { - return null; - } - - @Override - public void outputWindowedValue(OUT output, Instant timestamp, Collection windows, PaneInfo pane) { - - } - - @Override - public TimerInternals timerInternals() { - return null; - } - - @Override - public Collection windows() { - return ImmutableList.of(GlobalWindow.INSTANCE); - } - - @Override - public PaneInfo pane() { - return PaneInfo.NO_FIRING; - } - - @Override - public void writePCollectionViewData(TupleTag tag, Iterable> data, Coder elemCoder) throws IOException { - } - - @Override - public T sideInput(PCollectionView view, BoundedWindow mainInputWindow) { - throw new RuntimeException("sideInput() not implemented."); - } - }; - } - - @Override - public PipelineOptions getPipelineOptions() { - return options; - } - - @Override - public T sideInput(PCollectionView view) { - List sideInput = getRuntimeContext().getBroadcastVariable(view.getTagInternal().getId()); - List> windowedValueList = new ArrayList<>(sideInput.size()); - for (T input : sideInput) { - windowedValueList.add(WindowedValue.of(input, Instant.now(), ImmutableList.of(GlobalWindow.INSTANCE), pane())); - } - return view.fromIterableInternal(windowedValueList); - } - - @Override - public void output(OUT output) { - outCollector.collect(output); - } - - @Override - public void outputWithTimestamp(OUT output, Instant timestamp) { - // not FLink's way, just output normally - output(output); - } - - @Override - public void sideOutput(TupleTag tag, T output) { - // ignore the side output, this can happen when a user does not register - // side outputs but then outputs using a freshly created TupleTag. - } - - @Override - public void sideOutputWithTimestamp(TupleTag tag, T output, Instant timestamp) { - sideOutput(tag, output); - } - - @Override - protected Aggregator createAggregatorInternal(String name, Combine.CombineFn combiner) { - SerializableFnAggregatorWrapper wrapper = new SerializableFnAggregatorWrapper<>(combiner); - getRuntimeContext().addAccumulator(name, wrapper); - return wrapper; - } - - - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkKeyedListAggregationFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkKeyedListAggregationFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkKeyedListAggregationFunction.java deleted file mode 100644 index f92f888..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkKeyedListAggregationFunction.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.functions; - -import com.google.cloud.dataflow.sdk.values.KV; -import org.apache.flink.api.common.functions.GroupReduceFunction; -import org.apache.flink.util.Collector; - -import java.util.Iterator; - -/** - * Flink {@link org.apache.flink.api.common.functions.GroupReduceFunction} for executing a - * {@link com.google.cloud.dataflow.sdk.transforms.GroupByKey} operation. This reads the input - * {@link com.google.cloud.dataflow.sdk.values.KV} elements, extracts the key and collects - * the values in a {@code List}. - */ -public class FlinkKeyedListAggregationFunction implements GroupReduceFunction, KV>> { - - @Override - public void reduce(Iterable> values, Collector>> out) throws Exception { - Iterator> it = values.iterator(); - KV first = it.next(); - Iterable passThrough = new PassThroughIterable<>(first, it); - out.collect(KV.of(first.getKey(), passThrough)); - } - - private static class PassThroughIterable implements Iterable, Iterator { - private KV first; - private Iterator> iterator; - - public PassThroughIterable(KV first, Iterator> iterator) { - this.first = first; - this.iterator = iterator; - } - - @Override - public Iterator iterator() { - return this; - } - - @Override - public boolean hasNext() { - return first != null || iterator.hasNext(); - } - - @Override - public V next() { - if (first != null) { - V result = first.getValue(); - first = null; - return result; - } else { - return iterator.next().getValue(); - } - } - - @Override - public void remove() { - throw new UnsupportedOperationException("Cannot remove elements from input."); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java deleted file mode 100644 index ca667ee..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.functions; - -import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.transforms.Aggregator; -import com.google.cloud.dataflow.sdk.transforms.Combine; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.join.RawUnionValue; -import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; -import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow; -import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; -import com.google.cloud.dataflow.sdk.util.WindowedValue; -import com.google.cloud.dataflow.sdk.util.WindowingInternals; -import com.google.cloud.dataflow.sdk.values.PCollectionView; -import com.google.cloud.dataflow.sdk.values.TupleTag; -import com.google.common.collect.ImmutableList; -import org.apache.flink.api.common.functions.RichMapPartitionFunction; -import org.apache.flink.util.Collector; -import org.joda.time.Instant; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -/** - * Encapsulates a {@link com.google.cloud.dataflow.sdk.transforms.DoFn} that uses side outputs - * inside a Flink {@link org.apache.flink.api.common.functions.RichMapPartitionFunction}. - * - * We get a mapping from {@link com.google.cloud.dataflow.sdk.values.TupleTag} to output index - * and must tag all outputs with the output number. Afterwards a filter will filter out - * those elements that are not to be in a specific output. - */ -public class FlinkMultiOutputDoFnFunction extends RichMapPartitionFunction { - - private final DoFn doFn; - private transient PipelineOptions options; - private final Map, Integer> outputMap; - - public FlinkMultiOutputDoFnFunction(DoFn doFn, PipelineOptions options, Map, Integer> outputMap) { - this.doFn = doFn; - this.options = options; - this.outputMap = outputMap; - } - - private void writeObject(ObjectOutputStream out) - throws IOException, ClassNotFoundException { - out.defaultWriteObject(); - ObjectMapper mapper = new ObjectMapper(); - mapper.writeValue(out, options); - } - - private void readObject(ObjectInputStream in) - throws IOException, ClassNotFoundException { - in.defaultReadObject(); - ObjectMapper mapper = new ObjectMapper(); - options = mapper.readValue(in, PipelineOptions.class); - - } - - @Override - public void mapPartition(Iterable values, Collector out) throws Exception { - ProcessContext context = new ProcessContext(doFn, out); - this.doFn.startBundle(context); - for (IN value : values) { - context.inValue = value; - doFn.processElement(context); - } - this.doFn.finishBundle(context); - } - - private class ProcessContext extends DoFn.ProcessContext { - - IN inValue; - Collector outCollector; - - public ProcessContext(DoFn fn, Collector outCollector) { - fn.super(); - this.outCollector = outCollector; - } - - @Override - public IN element() { - return this.inValue; - } - - @Override - public Instant timestamp() { - return Instant.now(); - } - - @Override - public BoundedWindow window() { - return GlobalWindow.INSTANCE; - } - - @Override - public PaneInfo pane() { - return PaneInfo.NO_FIRING; - } - - @Override - public WindowingInternals windowingInternals() { - return null; - } - - @Override - public PipelineOptions getPipelineOptions() { - return options; - } - - @Override - public T sideInput(PCollectionView view) { - List sideInput = getRuntimeContext().getBroadcastVariable(view.getTagInternal() - .getId()); - List> windowedValueList = new ArrayList<>(sideInput.size()); - for (T input : sideInput) { - windowedValueList.add(WindowedValue.of(input, Instant.now(), ImmutableList.of(GlobalWindow.INSTANCE), pane())); - } - return view.fromIterableInternal(windowedValueList); - } - - @Override - public void output(OUT value) { - // assume that index 0 is the default output - outCollector.collect(new RawUnionValue(0, value)); - } - - @Override - public void outputWithTimestamp(OUT output, Instant timestamp) { - // not FLink's way, just output normally - output(output); - } - - @Override - @SuppressWarnings("unchecked") - public void sideOutput(TupleTag tag, T value) { - Integer index = outputMap.get(tag); - if (index != null) { - outCollector.collect(new RawUnionValue(index, value)); - } - } - - @Override - public void sideOutputWithTimestamp(TupleTag tag, T output, Instant timestamp) { - sideOutput(tag, output); - } - - @Override - protected Aggregator createAggregatorInternal(String name, Combine.CombineFn combiner) { - SerializableFnAggregatorWrapper wrapper = new SerializableFnAggregatorWrapper<>(combiner); - getRuntimeContext().addAccumulator(name, wrapper); - return null; - } - - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputPruningFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputPruningFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputPruningFunction.java deleted file mode 100644 index 37de37e..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputPruningFunction.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.functions; - -import com.google.cloud.dataflow.sdk.transforms.join.RawUnionValue; -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.util.Collector; - -/** - * A FlatMap function that filters out those elements that don't belong in this output. We need - * this to implement MultiOutput ParDo functions. - */ -public class FlinkMultiOutputPruningFunction implements FlatMapFunction { - - private final int outputTag; - - public FlinkMultiOutputPruningFunction(int outputTag) { - this.outputTag = outputTag; - } - - @Override - @SuppressWarnings("unchecked") - public void flatMap(RawUnionValue rawUnionValue, Collector collector) throws Exception { - if (rawUnionValue.getUnionTag() == outputTag) { - collector.collect((T) rawUnionValue.getValue()); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java deleted file mode 100644 index 2de681b..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.functions; - -import com.google.cloud.dataflow.sdk.transforms.Combine; -import com.google.cloud.dataflow.sdk.values.KV; -import org.apache.flink.api.common.functions.GroupCombineFunction; -import org.apache.flink.util.Collector; - -import java.util.Iterator; - -/** - * Flink {@link org.apache.flink.api.common.functions.GroupCombineFunction} for executing a - * {@link com.google.cloud.dataflow.sdk.transforms.Combine.PerKey} operation. This reads the input - * {@link com.google.cloud.dataflow.sdk.values.KV} elements VI, extracts the key and emits accumulated - * values which have the intermediate format VA. - */ -public class FlinkPartialReduceFunction implements GroupCombineFunction, KV> { - - private final Combine.KeyedCombineFn keyedCombineFn; - - public FlinkPartialReduceFunction(Combine.KeyedCombineFn - keyedCombineFn) { - this.keyedCombineFn = keyedCombineFn; - } - - @Override - public void combine(Iterable> elements, Collector> out) throws Exception { - - final Iterator> iterator = elements.iterator(); - // create accumulator using the first elements key - KV first = iterator.next(); - K key = first.getKey(); - VI value = first.getValue(); - VA accumulator = keyedCombineFn.createAccumulator(key); - accumulator = keyedCombineFn.addInput(key, accumulator, value); - - while(iterator.hasNext()) { - value = iterator.next().getValue(); - accumulator = keyedCombineFn.addInput(key, accumulator, value); - } - - out.collect(KV.of(key, accumulator)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java deleted file mode 100644 index 29193a2..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.functions; - -import com.google.cloud.dataflow.sdk.transforms.Combine; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.common.collect.ImmutableList; -import org.apache.flink.api.common.functions.GroupReduceFunction; -import org.apache.flink.util.Collector; - -import java.util.Iterator; - -/** - * Flink {@link org.apache.flink.api.common.functions.GroupReduceFunction} for executing a - * {@link com.google.cloud.dataflow.sdk.transforms.Combine.PerKey} operation. This reads the input - * {@link com.google.cloud.dataflow.sdk.values.KV} elements, extracts the key and merges the - * accumulators resulting from the PartialReduce which produced the input VA. - */ -public class FlinkReduceFunction implements GroupReduceFunction, KV> { - - private final Combine.KeyedCombineFn keyedCombineFn; - - public FlinkReduceFunction(Combine.KeyedCombineFn keyedCombineFn) { - this.keyedCombineFn = keyedCombineFn; - } - - @Override - public void reduce(Iterable> values, Collector> out) throws Exception { - Iterator> it = values.iterator(); - - KV current = it.next(); - K k = current.getKey(); - VA accumulator = current.getValue(); - - while (it.hasNext()) { - current = it.next(); - keyedCombineFn.mergeAccumulators(k, ImmutableList.of(accumulator, current.getValue()) ); - } - - out.collect(KV.of(k, keyedCombineFn.extractOutput(k, accumulator))); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/UnionCoder.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/UnionCoder.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/UnionCoder.java deleted file mode 100644 index 05f4415..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/UnionCoder.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.beam.runners.flink.translation.functions; - - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.coders.StandardCoder; -import com.google.cloud.dataflow.sdk.transforms.join.RawUnionValue; -import com.google.cloud.dataflow.sdk.util.PropertyNames; -import com.google.cloud.dataflow.sdk.util.VarInt; -import com.google.cloud.dataflow.sdk.util.common.ElementByteSizeObserver; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.List; - -/** - * A UnionCoder encodes RawUnionValues. - * - * This file copied from {@link com.google.cloud.dataflow.sdk.transforms.join.UnionCoder} - */ -@SuppressWarnings("serial") -public class UnionCoder extends StandardCoder { - // TODO: Think about how to integrate this with a schema object (i.e. - // a tuple of tuple tags). - /** - * Builds a union coder with the given list of element coders. This list - * corresponds to a mapping of union tag to Coder. Union tags start at 0. - */ - public static UnionCoder of(List> elementCoders) { - return new UnionCoder(elementCoders); - } - - @JsonCreator - public static UnionCoder jsonOf( - @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) - List> elements) { - return UnionCoder.of(elements); - } - - private int getIndexForEncoding(RawUnionValue union) { - if (union == null) { - throw new IllegalArgumentException("cannot encode a null tagged union"); - } - int index = union.getUnionTag(); - if (index < 0 || index >= elementCoders.size()) { - throw new IllegalArgumentException( - "union value index " + index + " not in range [0.." + - (elementCoders.size() - 1) + "]"); - } - return index; - } - - @SuppressWarnings("unchecked") - @Override - public void encode( - RawUnionValue union, - OutputStream outStream, - Context context) - throws IOException { - int index = getIndexForEncoding(union); - // Write out the union tag. - VarInt.encode(index, outStream); - - // Write out the actual value. - Coder coder = (Coder) elementCoders.get(index); - coder.encode( - union.getValue(), - outStream, - context); - } - - @Override - public RawUnionValue decode(InputStream inStream, Context context) - throws IOException { - int index = VarInt.decodeInt(inStream); - Object value = elementCoders.get(index).decode(inStream, context); - return new RawUnionValue(index, value); - } - - @Override - public List> getCoderArguments() { - return null; - } - - @Override - public List> getComponents() { - return elementCoders; - } - - /** - * Since this coder uses elementCoders.get(index) and coders that are known to run in constant - * time, we defer the return value to that coder. - */ - @Override - public boolean isRegisterByteSizeObserverCheap(RawUnionValue union, Context context) { - int index = getIndexForEncoding(union); - @SuppressWarnings("unchecked") - Coder coder = (Coder) elementCoders.get(index); - return coder.isRegisterByteSizeObserverCheap(union.getValue(), context); - } - - /** - * Notifies ElementByteSizeObserver about the byte size of the encoded value using this coder. - */ - @Override - public void registerByteSizeObserver( - RawUnionValue union, ElementByteSizeObserver observer, Context context) - throws Exception { - int index = getIndexForEncoding(union); - // Write out the union tag. - observer.update(VarInt.getLength(index)); - // Write out the actual value. - @SuppressWarnings("unchecked") - Coder coder = (Coder) elementCoders.get(index); - coder.registerByteSizeObserver(union.getValue(), observer, context); - } - - ///////////////////////////////////////////////////////////////////////////// - - private final List> elementCoders; - - private UnionCoder(List> elementCoders) { - this.elementCoders = elementCoders; - } - - @Override - public void verifyDeterministic() throws NonDeterministicException { - verifyDeterministic( - "UnionCoder is only deterministic if all element coders are", - elementCoders); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderComparator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderComparator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderComparator.java deleted file mode 100644 index 1249036..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderComparator.java +++ /dev/null @@ -1,216 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.types; - -import com.google.cloud.dataflow.sdk.coders.Coder; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.core.memory.MemorySegment; - -import java.io.IOException; -import java.io.ObjectInputStream; - -/** - * Flink {@link org.apache.flink.api.common.typeutils.TypeComparator} for - * {@link com.google.cloud.dataflow.sdk.coders.Coder}. - */ -public class CoderComparator extends TypeComparator { - - private Coder coder; - - // We use these for internal encoding/decoding for creating copies and comparing - // serialized forms using a Coder - private transient InspectableByteArrayOutputStream buffer1; - private transient InspectableByteArrayOutputStream buffer2; - - // For storing the Reference in encoded form - private transient InspectableByteArrayOutputStream referenceBuffer; - - public CoderComparator(Coder coder) { - this.coder = coder; - buffer1 = new InspectableByteArrayOutputStream(); - buffer2 = new InspectableByteArrayOutputStream(); - referenceBuffer = new InspectableByteArrayOutputStream(); - } - - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - buffer1 = new InspectableByteArrayOutputStream(); - buffer2 = new InspectableByteArrayOutputStream(); - referenceBuffer = new InspectableByteArrayOutputStream(); - } - - @Override - public int hash(T record) { - return record.hashCode(); - } - - @Override - public void setReference(T toCompare) { - referenceBuffer.reset(); - try { - coder.encode(toCompare, referenceBuffer, Coder.Context.OUTER); - } catch (IOException e) { - throw new RuntimeException("Could not set reference " + toCompare + ": " + e); - } - } - - @Override - public boolean equalToReference(T candidate) { - try { - buffer2.reset(); - coder.encode(candidate, buffer2, Coder.Context.OUTER); - byte[] arr = referenceBuffer.getBuffer(); - byte[] arrOther = buffer2.getBuffer(); - if (referenceBuffer.size() != buffer2.size()) { - return false; - } - int len = buffer2.size(); - for(int i = 0; i < len; i++ ) { - if (arr[i] != arrOther[i]) { - return false; - } - } - return true; - } catch (IOException e) { - throw new RuntimeException("Could not compare reference.", e); - } - } - - @Override - public int compareToReference(TypeComparator other) { - InspectableByteArrayOutputStream otherReferenceBuffer = ((CoderComparator) other).referenceBuffer; - - byte[] arr = referenceBuffer.getBuffer(); - byte[] arrOther = otherReferenceBuffer.getBuffer(); - if (referenceBuffer.size() != otherReferenceBuffer.size()) { - return referenceBuffer.size() - otherReferenceBuffer.size(); - } - int len = referenceBuffer.size(); - for (int i = 0; i < len; i++) { - if (arr[i] != arrOther[i]) { - return arr[i] - arrOther[i]; - } - } - return 0; - } - - @Override - public int compare(T first, T second) { - try { - buffer1.reset(); - buffer2.reset(); - coder.encode(first, buffer1, Coder.Context.OUTER); - coder.encode(second, buffer2, Coder.Context.OUTER); - byte[] arr = buffer1.getBuffer(); - byte[] arrOther = buffer2.getBuffer(); - if (buffer1.size() != buffer2.size()) { - return buffer1.size() - buffer2.size(); - } - int len = buffer1.size(); - for(int i = 0; i < len; i++ ) { - if (arr[i] != arrOther[i]) { - return arr[i] - arrOther[i]; - } - } - return 0; - } catch (IOException e) { - throw new RuntimeException("Could not compare: ", e); - } - } - - @Override - public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException { - CoderTypeSerializer serializer = new CoderTypeSerializer<>(coder); - T first = serializer.deserialize(firstSource); - T second = serializer.deserialize(secondSource); - return compare(first, second); - } - - @Override - public boolean supportsNormalizedKey() { - return true; - } - - @Override - public boolean supportsSerializationWithKeyNormalization() { - return false; - } - - @Override - public int getNormalizeKeyLen() { - return Integer.MAX_VALUE; - } - - @Override - public boolean isNormalizedKeyPrefixOnly(int keyBytes) { - return true; - } - - @Override - public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) { - buffer1.reset(); - try { - coder.encode(record, buffer1, Coder.Context.OUTER); - } catch (IOException e) { - throw new RuntimeException("Could not serializer " + record + " using coder " + coder + ": " + e); - } - final byte[] data = buffer1.getBuffer(); - final int limit = offset + numBytes; - - target.put(offset, data, 0, Math.min(numBytes, buffer1.size())); - - offset += buffer1.size(); - - while (offset < limit) { - target.put(offset++, (byte) 0); - } - } - - @Override - public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public boolean invertNormalizedKey() { - return false; - } - - @Override - public TypeComparator duplicate() { - return new CoderComparator<>(coder); - } - - @Override - public int extractKeys(Object record, Object[] target, int index) { - target[index] = record; - return 1; - } - - @Override - public TypeComparator[] getFlatComparators() { - return new TypeComparator[] { this.duplicate() }; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java deleted file mode 100644 index f9d4dcd..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.types; - -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.coders.VoidCoder; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.AtomicType; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import com.google.common.base.Preconditions; - -/** - * Flink {@link org.apache.flink.api.common.typeinfo.TypeInformation} for - * Dataflow {@link com.google.cloud.dataflow.sdk.coders.Coder}s. - */ -public class CoderTypeInformation extends TypeInformation implements AtomicType { - - private final Coder coder; - - public CoderTypeInformation(Coder coder) { - Preconditions.checkNotNull(coder); - this.coder = coder; - } - - @Override - public boolean isBasicType() { - return false; - } - - @Override - public boolean isTupleType() { - return false; - } - - @Override - public int getArity() { - return 1; - } - - @Override - @SuppressWarnings("unchecked") - public Class getTypeClass() { - // We don't have the Class, so we have to pass null here. What a shame... - return (Class) Object.class; - } - - @Override - public boolean isKeyType() { - return true; - } - - @Override - @SuppressWarnings("unchecked") - public TypeSerializer createSerializer(ExecutionConfig config) { - if (coder instanceof VoidCoder) { - return (TypeSerializer) new VoidCoderTypeSerializer(); - } - return new CoderTypeSerializer<>(coder); - } - - @Override - public int getTotalFields() { - return 2; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - CoderTypeInformation that = (CoderTypeInformation) o; - - return coder.equals(that.coder); - - } - - @Override - public int hashCode() { - return coder.hashCode(); - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof CoderTypeInformation; - } - - @Override - public String toString() { - return "CoderTypeInformation{" + - "coder=" + coder + - '}'; - } - - @Override - public TypeComparator createComparator(boolean sortOrderAscending, ExecutionConfig - executionConfig) { - return new CoderComparator<>(coder); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java deleted file mode 100644 index 4e81054..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.types; - -import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper; -import org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.coders.CoderException; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; - -import java.io.ByteArrayInputStream; -import java.io.EOFException; -import java.io.IOException; -import java.io.ObjectInputStream; - -/** - * Flink {@link org.apache.flink.api.common.typeutils.TypeSerializer} for - * Dataflow {@link com.google.cloud.dataflow.sdk.coders.Coder}s - */ -public class CoderTypeSerializer extends TypeSerializer { - - private Coder coder; - private transient DataInputViewWrapper inputWrapper; - private transient DataOutputViewWrapper outputWrapper; - - // We use this for internal encoding/decoding for creating copies using the Coder. - private transient InspectableByteArrayOutputStream buffer; - - public CoderTypeSerializer(Coder coder) { - this.coder = coder; - this.inputWrapper = new DataInputViewWrapper(null); - this.outputWrapper = new DataOutputViewWrapper(null); - - buffer = new InspectableByteArrayOutputStream(); - } - - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - this.inputWrapper = new DataInputViewWrapper(null); - this.outputWrapper = new DataOutputViewWrapper(null); - - buffer = new InspectableByteArrayOutputStream(); - } - - @Override - public boolean isImmutableType() { - return false; - } - - @Override - public CoderTypeSerializer duplicate() { - return new CoderTypeSerializer<>(coder); - } - - @Override - public T createInstance() { - return null; - } - - @Override - public T copy(T t) { - buffer.reset(); - try { - coder.encode(t, buffer, Coder.Context.OUTER); - } catch (IOException e) { - throw new RuntimeException("Could not copy.", e); - } - try { - return coder.decode(new ByteArrayInputStream(buffer.getBuffer(), 0, buffer - .size()), Coder.Context.OUTER); - } catch (IOException e) { - throw new RuntimeException("Could not copy.", e); - } - } - - @Override - public T copy(T t, T reuse) { - return copy(t); - } - - @Override - public int getLength() { - return 0; - } - - @Override - public void serialize(T t, DataOutputView dataOutputView) throws IOException { - outputWrapper.setOutputView(dataOutputView); - coder.encode(t, outputWrapper, Coder.Context.NESTED); - } - - @Override - public T deserialize(DataInputView dataInputView) throws IOException { - try { - inputWrapper.setInputView(dataInputView); - return coder.decode(inputWrapper, Coder.Context.NESTED); - } catch (CoderException e) { - Throwable cause = e.getCause(); - if (cause instanceof EOFException) { - throw (EOFException) cause; - } else { - throw e; - } - } - } - - @Override - public T deserialize(T t, DataInputView dataInputView) throws IOException { - return deserialize(dataInputView); - } - - @Override - public void copy(DataInputView dataInputView, DataOutputView dataOutputView) throws IOException { - serialize(deserialize(dataInputView), dataOutputView); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - CoderTypeSerializer that = (CoderTypeSerializer) o; - return coder.equals(that.coder); - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof CoderTypeSerializer; - } - - @Override - public int hashCode() { - return coder.hashCode(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/InspectableByteArrayOutputStream.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/InspectableByteArrayOutputStream.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/InspectableByteArrayOutputStream.java deleted file mode 100644 index 36b5ba3..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/InspectableByteArrayOutputStream.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.types; - -import java.io.ByteArrayOutputStream; - -/** - * Version of {@link java.io.ByteArrayOutputStream} that allows to retrieve the internal - * byte[] buffer without incurring an array copy. - */ -public class InspectableByteArrayOutputStream extends ByteArrayOutputStream { - - /** - * Get the underlying byte array. - */ - public byte[] getBuffer() { - return buf; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderComperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderComperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderComperator.java deleted file mode 100644 index 3912295..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderComperator.java +++ /dev/null @@ -1,264 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.types; - -import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.coders.KvCoder; -import com.google.cloud.dataflow.sdk.values.KV; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.core.memory.MemorySegment; - -import java.io.IOException; -import java.io.ObjectInputStream; - -/** - * Flink {@link org.apache.flink.api.common.typeutils.TypeComparator} for - * {@link com.google.cloud.dataflow.sdk.coders.KvCoder}. We have a special comparator - * for {@link KV} that always compares on the key only. - */ -public class KvCoderComperator extends TypeComparator> { - - private KvCoder coder; - private Coder keyCoder; - - // We use these for internal encoding/decoding for creating copies and comparing - // serialized forms using a Coder - private transient InspectableByteArrayOutputStream buffer1; - private transient InspectableByteArrayOutputStream buffer2; - - // For storing the Reference in encoded form - private transient InspectableByteArrayOutputStream referenceBuffer; - - - // For deserializing the key - private transient DataInputViewWrapper inputWrapper; - - public KvCoderComperator(KvCoder coder) { - this.coder = coder; - this.keyCoder = coder.getKeyCoder(); - - buffer1 = new InspectableByteArrayOutputStream(); - buffer2 = new InspectableByteArrayOutputStream(); - referenceBuffer = new InspectableByteArrayOutputStream(); - - inputWrapper = new DataInputViewWrapper(null); - } - - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - - buffer1 = new InspectableByteArrayOutputStream(); - buffer2 = new InspectableByteArrayOutputStream(); - referenceBuffer = new InspectableByteArrayOutputStream(); - - inputWrapper = new DataInputViewWrapper(null); - } - - @Override - public int hash(KV record) { - K key = record.getKey(); - if (key != null) { - return key.hashCode(); - } else { - return 0; - } - } - - @Override - public void setReference(KV toCompare) { - referenceBuffer.reset(); - try { - keyCoder.encode(toCompare.getKey(), referenceBuffer, Coder.Context.OUTER); - } catch (IOException e) { - throw new RuntimeException("Could not set reference " + toCompare + ": " + e); - } - } - - @Override - public boolean equalToReference(KV candidate) { - try { - buffer2.reset(); - keyCoder.encode(candidate.getKey(), buffer2, Coder.Context.OUTER); - byte[] arr = referenceBuffer.getBuffer(); - byte[] arrOther = buffer2.getBuffer(); - if (referenceBuffer.size() != buffer2.size()) { - return false; - } - int len = buffer2.size(); - for(int i = 0; i < len; i++ ) { - if (arr[i] != arrOther[i]) { - return false; - } - } - return true; - } catch (IOException e) { - throw new RuntimeException("Could not compare reference.", e); - } - } - - @Override - public int compareToReference(TypeComparator> other) { - InspectableByteArrayOutputStream otherReferenceBuffer = ((KvCoderComperator) other).referenceBuffer; - - byte[] arr = referenceBuffer.getBuffer(); - byte[] arrOther = otherReferenceBuffer.getBuffer(); - if (referenceBuffer.size() != otherReferenceBuffer.size()) { - return referenceBuffer.size() - otherReferenceBuffer.size(); - } - int len = referenceBuffer.size(); - for (int i = 0; i < len; i++) { - if (arr[i] != arrOther[i]) { - return arr[i] - arrOther[i]; - } - } - return 0; - } - - - @Override - public int compare(KV first, KV second) { - try { - buffer1.reset(); - buffer2.reset(); - keyCoder.encode(first.getKey(), buffer1, Coder.Context.OUTER); - keyCoder.encode(second.getKey(), buffer2, Coder.Context.OUTER); - byte[] arr = buffer1.getBuffer(); - byte[] arrOther = buffer2.getBuffer(); - if (buffer1.size() != buffer2.size()) { - return buffer1.size() - buffer2.size(); - } - int len = buffer1.size(); - for(int i = 0; i < len; i++ ) { - if (arr[i] != arrOther[i]) { - return arr[i] - arrOther[i]; - } - } - return 0; - } catch (IOException e) { - throw new RuntimeException("Could not compare reference.", e); - } - } - - @Override - public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException { - - inputWrapper.setInputView(firstSource); - K firstKey = keyCoder.decode(inputWrapper, Coder.Context.NESTED); - inputWrapper.setInputView(secondSource); - K secondKey = keyCoder.decode(inputWrapper, Coder.Context.NESTED); - - try { - buffer1.reset(); - buffer2.reset(); - keyCoder.encode(firstKey, buffer1, Coder.Context.OUTER); - keyCoder.encode(secondKey, buffer2, Coder.Context.OUTER); - byte[] arr = buffer1.getBuffer(); - byte[] arrOther = buffer2.getBuffer(); - if (buffer1.size() != buffer2.size()) { - return buffer1.size() - buffer2.size(); - } - int len = buffer1.size(); - for(int i = 0; i < len; i++ ) { - if (arr[i] != arrOther[i]) { - return arr[i] - arrOther[i]; - } - } - return 0; - } catch (IOException e) { - throw new RuntimeException("Could not compare reference.", e); - } - } - - @Override - public boolean supportsNormalizedKey() { - return true; - } - - @Override - public boolean supportsSerializationWithKeyNormalization() { - return false; - } - - @Override - public int getNormalizeKeyLen() { - return Integer.MAX_VALUE; - } - - @Override - public boolean isNormalizedKeyPrefixOnly(int keyBytes) { - return true; - } - - @Override - public void putNormalizedKey(KV record, MemorySegment target, int offset, int numBytes) { - buffer1.reset(); - try { - keyCoder.encode(record.getKey(), buffer1, Coder.Context.NESTED); - } catch (IOException e) { - throw new RuntimeException("Could not serializer " + record + " using coder " + coder + ": " + e); - } - final byte[] data = buffer1.getBuffer(); - final int limit = offset + numBytes; - - int numBytesPut = Math.min(numBytes, buffer1.size()); - - target.put(offset, data, 0, numBytesPut); - - offset += numBytesPut; - - while (offset < limit) { - target.put(offset++, (byte) 0); - } - } - - @Override - public void writeWithKeyNormalization(KV record, DataOutputView target) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public KV readWithKeyDenormalization(KV reuse, DataInputView source) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public boolean invertNormalizedKey() { - return false; - } - - @Override - public TypeComparator> duplicate() { - return new KvCoderComperator<>(coder); - } - - @Override - public int extractKeys(Object record, Object[] target, int index) { - KV kv = (KV) record; - K k = kv.getKey(); - target[index] = k; - return 1; - } - - @Override - public TypeComparator[] getFlatComparators() { - return new TypeComparator[] {new CoderComparator<>(keyCoder)}; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderTypeInformation.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderTypeInformation.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderTypeInformation.java deleted file mode 100644 index 8862d48..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderTypeInformation.java +++ /dev/null @@ -1,186 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.types; - -import com.google.cloud.dataflow.sdk.coders.KvCoder; -import com.google.cloud.dataflow.sdk.values.KV; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.CompositeType; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import com.google.common.base.Preconditions; - -import java.util.List; - -/** - * Flink {@link org.apache.flink.api.common.typeinfo.TypeInformation} for - * Dataflow {@link com.google.cloud.dataflow.sdk.coders.KvCoder}. - */ -public class KvCoderTypeInformation extends CompositeType> { - - private KvCoder coder; - - // We don't have the Class, so we have to pass null here. What a shame... - private static Object DUMMY = new Object(); - - @SuppressWarnings("unchecked") - public KvCoderTypeInformation(KvCoder coder) { - super(((Class>) DUMMY.getClass())); - this.coder = coder; - Preconditions.checkNotNull(coder); - } - - @Override - @SuppressWarnings("unchecked") - public TypeComparator> createComparator(int[] logicalKeyFields, boolean[] orders, int logicalFieldOffset, ExecutionConfig config) { - return new KvCoderComperator((KvCoder) coder); - } - - @Override - public boolean isBasicType() { - return false; - } - - @Override - public boolean isTupleType() { - return false; - } - - @Override - public int getArity() { - return 2; - } - - @Override - @SuppressWarnings("unchecked") - public Class> getTypeClass() { - return privateGetTypeClass(); - } - - @SuppressWarnings("unchecked") - private static Class privateGetTypeClass() { - return (Class) Object.class; - } - - @Override - public boolean isKeyType() { - return true; - } - - @Override - @SuppressWarnings("unchecked") - public TypeSerializer> createSerializer(ExecutionConfig config) { - return new CoderTypeSerializer<>(coder); - } - - @Override - public int getTotalFields() { - return 2; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - KvCoderTypeInformation that = (KvCoderTypeInformation) o; - - return coder.equals(that.coder); - - } - - @Override - public int hashCode() { - return coder.hashCode(); - } - - @Override - public String toString() { - return "CoderTypeInformation{" + - "coder=" + coder + - '}'; - } - - @Override - @SuppressWarnings("unchecked") - public TypeInformation getTypeAt(int pos) { - if (pos == 0) { - return (TypeInformation) new CoderTypeInformation<>(coder.getKeyCoder()); - } else if (pos == 1) { - return (TypeInformation) new CoderTypeInformation<>(coder.getValueCoder()); - } else { - throw new RuntimeException("Invalid field position " + pos); - } - } - - @Override - @SuppressWarnings("unchecked") - public TypeInformation getTypeAt(String fieldExpression) { - switch (fieldExpression) { - case "key": - return (TypeInformation) new CoderTypeInformation<>(coder.getKeyCoder()); - case "value": - return (TypeInformation) new CoderTypeInformation<>(coder.getValueCoder()); - default: - throw new UnsupportedOperationException("Only KvCoder has fields."); - } - } - - @Override - public String[] getFieldNames() { - return new String[]{"key", "value"}; - } - - @Override - public int getFieldIndex(String fieldName) { - switch (fieldName) { - case "key": - return 0; - case "value": - return 1; - default: - return -1; - } - } - - @Override - public void getFlatFields(String fieldExpression, int offset, List result) { - CoderTypeInformation keyTypeInfo = new CoderTypeInformation<>(coder.getKeyCoder()); - result.add(new FlatFieldDescriptor(0, keyTypeInfo)); - } - - @Override - protected TypeComparatorBuilder> createTypeComparatorBuilder() { - return new KvCoderTypeComparatorBuilder(); - } - - private class KvCoderTypeComparatorBuilder implements TypeComparatorBuilder> { - - @Override - public void initializeTypeComparatorBuilder(int size) {} - - @Override - public void addComparatorField(int fieldId, TypeComparator comparator) {} - - @Override - public TypeComparator> createTypeComparator(ExecutionConfig config) { - return new KvCoderComperator<>(coder); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/VoidCoderTypeSerializer.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/VoidCoderTypeSerializer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/VoidCoderTypeSerializer.java deleted file mode 100644 index 8bc3620..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/VoidCoderTypeSerializer.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.types; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; - -import java.io.IOException; - -/** - * Special Flink {@link org.apache.flink.api.common.typeutils.TypeSerializer} for - * {@link com.google.cloud.dataflow.sdk.coders.VoidCoder}. We need this because Flink does not - * allow returning {@code null} from an input reader. We return a {@link VoidValue} instead - * that behaves like a {@code null}, hopefully. - */ -public class VoidCoderTypeSerializer extends TypeSerializer { - - @Override - public boolean isImmutableType() { - return false; - } - - @Override - public VoidCoderTypeSerializer duplicate() { - return this; - } - - @Override - public VoidValue createInstance() { - return VoidValue.INSTANCE; - } - - @Override - public VoidValue copy(VoidValue from) { - return from; - } - - @Override - public VoidValue copy(VoidValue from, VoidValue reuse) { - return from; - } - - @Override - public int getLength() { - return 0; - } - - @Override - public void serialize(VoidValue record, DataOutputView target) throws IOException { - target.writeByte(1); - } - - @Override - public VoidValue deserialize(DataInputView source) throws IOException { - source.readByte(); - return VoidValue.INSTANCE; - } - - @Override - public VoidValue deserialize(VoidValue reuse, DataInputView source) throws IOException { - return deserialize(source); - } - - @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { - source.readByte(); - target.writeByte(1); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof VoidCoderTypeSerializer) { - VoidCoderTypeSerializer other = (VoidCoderTypeSerializer) obj; - return other.canEqual(this); - } else { - return false; - } - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof VoidCoderTypeSerializer; - } - - @Override - public int hashCode() { - return 0; - } - - public static class VoidValue { - private VoidValue() {} - - public static VoidValue INSTANCE = new VoidValue(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/CombineFnAggregatorWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/CombineFnAggregatorWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/CombineFnAggregatorWrapper.java deleted file mode 100644 index 445d411..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/CombineFnAggregatorWrapper.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers; - -import com.google.cloud.dataflow.sdk.transforms.Aggregator; -import com.google.cloud.dataflow.sdk.transforms.Combine; -import com.google.common.collect.Lists; -import org.apache.flink.api.common.accumulators.Accumulator; - -import java.io.Serializable; - -/** - * Wrapper that wraps a {@link com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn} - * in a Flink {@link org.apache.flink.api.common.accumulators.Accumulator} for using - * the combine function as an aggregator in a {@link com.google.cloud.dataflow.sdk.transforms.ParDo} - * operation. - */ -public class CombineFnAggregatorWrapper implements Aggregator, Accumulator { - - private AA aa; - private Combine.CombineFn combiner; - - public CombineFnAggregatorWrapper() { - } - - public CombineFnAggregatorWrapper(Combine.CombineFn combiner) { - this.combiner = combiner; - this.aa = combiner.createAccumulator(); - } - - @Override - public void add(AI value) { - combiner.addInput(aa, value); - } - - @Override - public Serializable getLocalValue() { - return (Serializable) combiner.extractOutput(aa); - } - - @Override - public void resetLocal() { - aa = combiner.createAccumulator(); - } - - @Override - @SuppressWarnings("unchecked") - public void merge(Accumulator other) { - aa = combiner.mergeAccumulators(Lists.newArrayList(aa, ((CombineFnAggregatorWrapper)other).aa)); - } - - @Override - public Accumulator clone() { - // copy it by merging - AA aaCopy = combiner.mergeAccumulators(Lists.newArrayList(aa)); - CombineFnAggregatorWrapper result = new - CombineFnAggregatorWrapper<>(combiner); - result.aa = aaCopy; - return result; - } - - @Override - public void addValue(AI value) { - add(value); - } - - @Override - public String getName() { - return "CombineFn: " + combiner.toString(); - } - - @Override - public Combine.CombineFn getCombineFn() { - return combiner; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataInputViewWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataInputViewWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataInputViewWrapper.java deleted file mode 100644 index 6a3cf50..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataInputViewWrapper.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers; - -import org.apache.flink.core.memory.DataInputView; - -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; - -/** - * Wrapper for {@link DataInputView}. We need this because Flink reads data using a - * {@link org.apache.flink.core.memory.DataInputView} while - * Dataflow {@link com.google.cloud.dataflow.sdk.coders.Coder}s expect an - * {@link java.io.InputStream}. - */ -public class DataInputViewWrapper extends InputStream { - - private DataInputView inputView; - - public DataInputViewWrapper(DataInputView inputView) { - this.inputView = inputView; - } - - public void setInputView(DataInputView inputView) { - this.inputView = inputView; - } - - @Override - public int read() throws IOException { - try { - return inputView.readUnsignedByte(); - } catch (EOFException e) { - // translate between DataInput and InputStream, - // DataInput signals EOF by exception, InputStream does it by returning -1 - return -1; - } - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - return inputView.read(b, off, len); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java deleted file mode 100644 index 6bd2240..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers; - -import org.apache.flink.core.memory.DataOutputView; - -import java.io.IOException; -import java.io.OutputStream; - -/** - * Wrapper for {@link org.apache.flink.core.memory.DataOutputView}. We need this because - * Flink writes data using a {@link org.apache.flink.core.memory.DataInputView} while - * Dataflow {@link com.google.cloud.dataflow.sdk.coders.Coder}s expect an - * {@link java.io.OutputStream}. - */ -public class DataOutputViewWrapper extends OutputStream { - - private DataOutputView outputView; - - public DataOutputViewWrapper(DataOutputView outputView) { - this.outputView = outputView; - } - - public void setOutputView(DataOutputView outputView) { - this.outputView = outputView; - } - - @Override - public void write(int b) throws IOException { - outputView.write(b); - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - outputView.write(b, off, len); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java deleted file mode 100644 index 4409586..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers; - -import com.google.cloud.dataflow.sdk.transforms.Aggregator; -import com.google.cloud.dataflow.sdk.transforms.Combine; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import org.apache.flink.api.common.accumulators.Accumulator; - -import java.io.Serializable; - -/** - * Wrapper that wraps a {@link com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn} - * in a Flink {@link org.apache.flink.api.common.accumulators.Accumulator} for using - * the function as an aggregator in a {@link com.google.cloud.dataflow.sdk.transforms.ParDo} - * operation. - */ -public class SerializableFnAggregatorWrapper implements Aggregator, Accumulator { - - private AO aa; - private Combine.CombineFn combiner; - - public SerializableFnAggregatorWrapper(Combine.CombineFn combiner) { - this.combiner = combiner; - resetLocal(); - } - - @Override - @SuppressWarnings("unchecked") - public void add(AI value) { - this.aa = combiner.apply(ImmutableList.of((AI) aa, value)); - } - - @Override - public Serializable getLocalValue() { - return (Serializable) aa; - } - - @Override - public void resetLocal() { - this.aa = combiner.apply(ImmutableList.of()); - } - - @Override - @SuppressWarnings("unchecked") - public void merge(Accumulator other) { - this.aa = combiner.apply(ImmutableList.of((AI) aa, (AI) other.getLocalValue())); - } - - @Override - public void addValue(AI value) { - add(value); - } - - @Override - public String getName() { - return "Aggregator :" + combiner.toString(); - } - - @Override - public Combine.CombineFn getCombineFn() { - return combiner; - } - - @Override - public Accumulator clone() { - // copy it by merging - AO resultCopy = combiner.apply(Lists.newArrayList((AI) aa)); - SerializableFnAggregatorWrapper result = new - SerializableFnAggregatorWrapper<>(combiner); - - result.aa = resultCopy; - return result; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java deleted file mode 100644 index 4c2475d..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.flink.translation.wrappers; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.cloud.dataflow.sdk.io.Sink; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.common.base.Preconditions; -import com.google.cloud.dataflow.sdk.transforms.Write; -import org.apache.flink.api.common.io.OutputFormat; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.util.AbstractID; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.lang.reflect.Field; - -/** - * Wrapper class to use generic Write.Bound transforms as sinks. - * @param The type of the incoming records. - */ -public class SinkOutputFormat implements OutputFormat { - - private final Sink sink; - - private transient PipelineOptions pipelineOptions; - - private Sink.WriteOperation writeOperation; - private Sink.Writer writer; - - private AbstractID uid = new AbstractID(); - - public SinkOutputFormat(Write.Bound transform, PipelineOptions pipelineOptions) { - this.sink = extractSink(transform); - this.pipelineOptions = Preconditions.checkNotNull(pipelineOptions); - } - - private Sink extractSink(Write.Bound transform) { - // TODO possibly add a getter in the upstream - try { - Field sinkField = transform.getClass().getDeclaredField("sink"); - sinkField.setAccessible(true); - @SuppressWarnings("unchecked") - Sink extractedSink = (Sink) sinkField.get(transform); - return extractedSink; - } catch (NoSuchFieldException | IllegalAccessException e) { - throw new RuntimeException("Could not acquire custom sink field.", e); - } - } - - @Override - public void configure(Configuration configuration) { - writeOperation = sink.createWriteOperation(pipelineOptions); - try { - writeOperation.initialize(pipelineOptions); - } catch (Exception e) { - throw new RuntimeException("Failed to initialize the write operation.", e); - } - } - - @Override - public void open(int taskNumber, int numTasks) throws IOException { - try { - writer = writeOperation.createWriter(pipelineOptions); - } catch (Exception e) { - throw new IOException("Couldn't create writer.", e); - } - try { - writer.open(uid + "-" + String.valueOf(taskNumber)); - } catch (Exception e) { - throw new IOException("Couldn't open writer.", e); - } - } - - @Override - public void writeRecord(T record) throws IOException { - try { - writer.write(record); - } catch (Exception e) { - throw new IOException("Couldn't write record.", e); - } - } - - @Override - public void close() throws IOException { - try { - writer.close(); - } catch (Exception e) { - throw new IOException("Couldn't close writer.", e); - } - } - - private void writeObject(ObjectOutputStream out) throws IOException, ClassNotFoundException { - out.defaultWriteObject(); - ObjectMapper mapper = new ObjectMapper(); - mapper.writeValue(out, pipelineOptions); - } - - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - ObjectMapper mapper = new ObjectMapper(); - pipelineOptions = mapper.readValue(in, PipelineOptions.class); - } -}