Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 6598C200C6A for ; Wed, 19 Apr 2017 15:09:14 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 63E79160BB5; Wed, 19 Apr 2017 13:09:14 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1D52F160BAA for ; Wed, 19 Apr 2017 15:09:11 +0200 (CEST) Received: (qmail 16194 invoked by uid 500); 19 Apr 2017 13:09:11 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 16148 invoked by uid 99); 19 Apr 2017 13:09:11 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Apr 2017 13:09:11 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0AD18E04F2; Wed, 19 Apr 2017 13:09:11 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: iemejia@apache.org To: commits@beam.apache.org Date: Wed, 19 Apr 2017 13:09:12 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [03/18] beam git commit: [BEAM-1994] Remove Flink examples package archived-at: Wed, 19 Apr 2017 13:09:14 -0000 http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java new file mode 100644 index 0000000..2bf0bf1 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.state; + +import com.google.common.collect.Iterators; +import java.util.Collections; +import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.runners.core.StateNamespace; +import org.apache.beam.runners.core.StateTag; +import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.CombineWithContext; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.util.state.BagState; +import org.apache.beam.sdk.util.state.CombiningState; +import org.apache.beam.sdk.util.state.MapState; +import org.apache.beam.sdk.util.state.ReadableState; +import org.apache.beam.sdk.util.state.SetState; +import org.apache.beam.sdk.util.state.State; +import org.apache.beam.sdk.util.state.StateContext; +import org.apache.beam.sdk.util.state.StateContexts; +import org.apache.beam.sdk.util.state.ValueState; +import org.apache.beam.sdk.util.state.WatermarkHoldState; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.runtime.state.OperatorStateBackend; + +/** + * {@link StateInternals} that uses a Flink {@link OperatorStateBackend} + * to manage the split-distribute state. + * + *

Elements in ListState will be redistributed in round robin fashion + * to operators when restarting with a different parallelism. + * + *

Note: + * Ignore index of key and namespace. + * Just implement BagState. + */ +public class FlinkSplitStateInternals implements StateInternals { + + private final OperatorStateBackend stateBackend; + + public FlinkSplitStateInternals(OperatorStateBackend stateBackend) { + this.stateBackend = stateBackend; + } + + @Override + public K getKey() { + return null; + } + + @Override + public T state( + final StateNamespace namespace, + StateTag address) { + + return state(namespace, address, StateContexts.nullContext()); + } + + @Override + public T state( + final StateNamespace namespace, + StateTag address, + final StateContext context) { + + return address.bind(new StateTag.StateBinder() { + + @Override + public ValueState bindValue( + StateTag> address, + Coder coder) { + throw new UnsupportedOperationException( + String.format("%s is not supported", ValueState.class.getSimpleName())); + } + + @Override + public BagState bindBag( + StateTag> address, + Coder elemCoder) { + + return new FlinkSplitBagState<>(stateBackend, address, namespace, elemCoder); + } + + @Override + public SetState bindSet( + StateTag> address, + Coder elemCoder) { + throw new UnsupportedOperationException( + String.format("%s is not supported", SetState.class.getSimpleName())); + } + + @Override + public MapState bindMap( + StateTag> spec, + Coder mapKeyCoder, Coder mapValueCoder) { + throw new UnsupportedOperationException( + String.format("%s is not supported", MapState.class.getSimpleName())); + } + + @Override + public + CombiningState + bindCombiningValue( + StateTag> address, + Coder accumCoder, + Combine.CombineFn combineFn) { + throw new UnsupportedOperationException("bindCombiningValue is not supported."); + } + + @Override + public + CombiningState bindKeyedCombiningValue( + StateTag> address, + Coder accumCoder, + final Combine.KeyedCombineFn combineFn) { + throw new UnsupportedOperationException("bindKeyedCombiningValue is not supported."); + + } + + @Override + public + CombiningState bindKeyedCombiningValueWithContext( + StateTag> address, + Coder accumCoder, + CombineWithContext.KeyedCombineFnWithContext< + ? super K, InputT, AccumT, OutputT> combineFn) { + throw new UnsupportedOperationException( + "bindKeyedCombiningValueWithContext is not supported."); + } + + @Override + public WatermarkHoldState bindWatermark( + StateTag> address, + OutputTimeFn outputTimeFn) { + throw new UnsupportedOperationException( + String.format("%s is not supported", CombiningState.class.getSimpleName())); + } + }); + } + + private static class FlinkSplitBagState implements BagState { + + private final ListStateDescriptor descriptor; + private OperatorStateBackend flinkStateBackend; + private final StateNamespace namespace; + private final StateTag> address; + + FlinkSplitBagState( + OperatorStateBackend flinkStateBackend, + StateTag> address, + StateNamespace namespace, + Coder coder) { + this.flinkStateBackend = flinkStateBackend; + this.namespace = namespace; + this.address = address; + + CoderTypeInformation typeInfo = + new CoderTypeInformation<>(coder); + + descriptor = new ListStateDescriptor<>(address.getId(), + typeInfo.createSerializer(new ExecutionConfig())); + } + + @Override + public void add(T input) { + try { + flinkStateBackend.getOperatorState(descriptor).add(input); + } catch (Exception e) { + throw new RuntimeException("Error updating state.", e); + } + } + + @Override + public BagState readLater() { + return this; + } + + @Override + public Iterable read() { + try { + Iterable result = flinkStateBackend.getOperatorState(descriptor).get(); + return result != null ? result : Collections.emptyList(); + } catch (Exception e) { + throw new RuntimeException("Error updating state.", e); + } + } + + @Override + public ReadableState isEmpty() { + return new ReadableState() { + @Override + public Boolean read() { + try { + Iterable result = flinkStateBackend.getOperatorState(descriptor).get(); + // PartitionableListState.get() return empty collection When there is no element, + // KeyedListState different. (return null) + return result == null || Iterators.size(result.iterator()) == 0; + } catch (Exception e) { + throw new RuntimeException("Error reading state.", e); + } + + } + + @Override + public ReadableState readLater() { + return this; + } + }; + } + + @Override + public void clear() { + try { + flinkStateBackend.getOperatorState(descriptor).clear(); + } catch (Exception e) { + throw new RuntimeException("Error reading state.", e); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + FlinkSplitBagState that = (FlinkSplitBagState) o; + + return namespace.equals(that.namespace) && address.equals(that.address); + + } + + @Override + public int hashCode() { + int result = namespace.hashCode(); + result = 31 * result + address.hashCode(); + return result; + } + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java new file mode 100644 index 0000000..4f961e5 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java @@ -0,0 +1,1053 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.state; + +import com.google.common.collect.Lists; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.runners.core.StateNamespace; +import org.apache.beam.runners.core.StateTag; +import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.InstantCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.CombineWithContext; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.util.CombineContextFactory; +import org.apache.beam.sdk.util.state.BagState; +import org.apache.beam.sdk.util.state.CombiningState; +import org.apache.beam.sdk.util.state.MapState; +import org.apache.beam.sdk.util.state.ReadableState; +import org.apache.beam.sdk.util.state.SetState; +import org.apache.beam.sdk.util.state.State; +import org.apache.beam.sdk.util.state.StateContext; +import org.apache.beam.sdk.util.state.StateContexts; +import org.apache.beam.sdk.util.state.ValueState; +import org.apache.beam.sdk.util.state.WatermarkHoldState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.joda.time.Instant; + +/** + * {@link StateInternals} that uses a Flink {@link KeyedStateBackend} to manage state. + * + *

Note: In the Flink streaming runner the key is always encoded + * using an {@link Coder} and stored in a {@link ByteBuffer}. + */ +public class FlinkStateInternals implements StateInternals { + + private final KeyedStateBackend flinkStateBackend; + private Coder keyCoder; + + // on recovery, these will no be properly set because we don't + // know which watermark hold states there are in the Flink State Backend + private final Map watermarkHolds = new HashMap<>(); + + public FlinkStateInternals(KeyedStateBackend flinkStateBackend, Coder keyCoder) { + this.flinkStateBackend = flinkStateBackend; + this.keyCoder = keyCoder; + } + + /** + * Returns the minimum over all watermark holds. + */ + public Instant watermarkHold() { + long min = Long.MAX_VALUE; + for (Instant hold: watermarkHolds.values()) { + min = Math.min(min, hold.getMillis()); + } + return new Instant(min); + } + + @Override + public K getKey() { + ByteBuffer keyBytes = flinkStateBackend.getCurrentKey(); + try { + return CoderUtils.decodeFromByteArray(keyCoder, keyBytes.array()); + } catch (CoderException e) { + throw new RuntimeException("Error decoding key.", e); + } + } + + @Override + public T state( + final StateNamespace namespace, + StateTag address) { + + return state(namespace, address, StateContexts.nullContext()); + } + + @Override + public T state( + final StateNamespace namespace, + StateTag address, + final StateContext context) { + + return address.bind(new StateTag.StateBinder() { + + @Override + public ValueState bindValue( + StateTag> address, + Coder coder) { + + return new FlinkValueState<>(flinkStateBackend, address, namespace, coder); + } + + @Override + public BagState bindBag( + StateTag> address, + Coder elemCoder) { + + return new FlinkBagState<>(flinkStateBackend, address, namespace, elemCoder); + } + + @Override + public SetState bindSet( + StateTag> address, + Coder elemCoder) { + throw new UnsupportedOperationException( + String.format("%s is not supported", SetState.class.getSimpleName())); + } + + @Override + public MapState bindMap( + StateTag> spec, + Coder mapKeyCoder, Coder mapValueCoder) { + throw new UnsupportedOperationException( + String.format("%s is not supported", MapState.class.getSimpleName())); + } + + @Override + public + CombiningState + bindCombiningValue( + StateTag> address, + Coder accumCoder, + Combine.CombineFn combineFn) { + + return new FlinkCombiningState<>( + flinkStateBackend, address, combineFn, namespace, accumCoder); + } + + @Override + public + CombiningState bindKeyedCombiningValue( + StateTag> address, + Coder accumCoder, + final Combine.KeyedCombineFn combineFn) { + return new FlinkKeyedCombiningState<>( + flinkStateBackend, + address, + combineFn, + namespace, + accumCoder, + FlinkStateInternals.this); + } + + @Override + public + CombiningState bindKeyedCombiningValueWithContext( + StateTag> address, + Coder accumCoder, + CombineWithContext.KeyedCombineFnWithContext< + ? super K, InputT, AccumT, OutputT> combineFn) { + return new FlinkCombiningStateWithContext<>( + flinkStateBackend, + address, + combineFn, + namespace, + accumCoder, + FlinkStateInternals.this, + CombineContextFactory.createFromStateContext(context)); + } + + @Override + public WatermarkHoldState bindWatermark( + StateTag> address, + OutputTimeFn outputTimeFn) { + + return new FlinkWatermarkHoldState<>( + flinkStateBackend, FlinkStateInternals.this, address, namespace, outputTimeFn); + } + }); + } + + private static class FlinkValueState implements ValueState { + + private final StateNamespace namespace; + private final StateTag> address; + private final ValueStateDescriptor flinkStateDescriptor; + private final KeyedStateBackend flinkStateBackend; + + FlinkValueState( + KeyedStateBackend flinkStateBackend, + StateTag> address, + StateNamespace namespace, + Coder coder) { + + this.namespace = namespace; + this.address = address; + this.flinkStateBackend = flinkStateBackend; + + CoderTypeInformation typeInfo = new CoderTypeInformation<>(coder); + + flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), typeInfo, null); + } + + @Override + public void write(T input) { + try { + flinkStateBackend.getPartitionedState( + namespace.stringKey(), + StringSerializer.INSTANCE, + flinkStateDescriptor).update(input); + } catch (Exception e) { + throw new RuntimeException("Error updating state.", e); + } + } + + @Override + public ValueState readLater() { + return this; + } + + @Override + public T read() { + try { + return flinkStateBackend.getPartitionedState( + namespace.stringKey(), + StringSerializer.INSTANCE, + flinkStateDescriptor).value(); + } catch (Exception e) { + throw new RuntimeException("Error reading state.", e); + } + } + + @Override + public void clear() { + try { + flinkStateBackend.getPartitionedState( + namespace.stringKey(), + StringSerializer.INSTANCE, + flinkStateDescriptor).clear(); + } catch (Exception e) { + throw new RuntimeException("Error clearing state.", e); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + FlinkValueState that = (FlinkValueState) o; + + return namespace.equals(that.namespace) && address.equals(that.address); + + } + + @Override + public int hashCode() { + int result = namespace.hashCode(); + result = 31 * result + address.hashCode(); + return result; + } + } + + private static class FlinkBagState implements BagState { + + private final StateNamespace namespace; + private final StateTag> address; + private final ListStateDescriptor flinkStateDescriptor; + private final KeyedStateBackend flinkStateBackend; + + FlinkBagState( + KeyedStateBackend flinkStateBackend, + StateTag> address, + StateNamespace namespace, + Coder coder) { + + this.namespace = namespace; + this.address = address; + this.flinkStateBackend = flinkStateBackend; + + CoderTypeInformation typeInfo = new CoderTypeInformation<>(coder); + + flinkStateDescriptor = new ListStateDescriptor<>(address.getId(), typeInfo); + } + + @Override + public void add(T input) { + try { + flinkStateBackend.getPartitionedState( + namespace.stringKey(), + StringSerializer.INSTANCE, + flinkStateDescriptor).add(input); + } catch (Exception e) { + throw new RuntimeException("Error adding to bag state.", e); + } + } + + @Override + public BagState readLater() { + return this; + } + + @Override + public Iterable read() { + try { + Iterable result = flinkStateBackend.getPartitionedState( + namespace.stringKey(), + StringSerializer.INSTANCE, + flinkStateDescriptor).get(); + + return result != null ? result : Collections.emptyList(); + } catch (Exception e) { + throw new RuntimeException("Error reading state.", e); + } + } + + @Override + public ReadableState isEmpty() { + return new ReadableState() { + @Override + public Boolean read() { + try { + Iterable result = flinkStateBackend.getPartitionedState( + namespace.stringKey(), + StringSerializer.INSTANCE, + flinkStateDescriptor).get(); + return result == null; + } catch (Exception e) { + throw new RuntimeException("Error reading state.", e); + } + + } + + @Override + public ReadableState readLater() { + return this; + } + }; + } + + @Override + public void clear() { + try { + flinkStateBackend.getPartitionedState( + namespace.stringKey(), + StringSerializer.INSTANCE, + flinkStateDescriptor).clear(); + } catch (Exception e) { + throw new RuntimeException("Error clearing state.", e); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + FlinkBagState that = (FlinkBagState) o; + + return namespace.equals(that.namespace) && address.equals(that.address); + + } + + @Override + public int hashCode() { + int result = namespace.hashCode(); + result = 31 * result + address.hashCode(); + return result; + } + } + + private static class FlinkCombiningState + implements CombiningState { + + private final StateNamespace namespace; + private final StateTag> address; + private final Combine.CombineFn combineFn; + private final ValueStateDescriptor flinkStateDescriptor; + private final KeyedStateBackend flinkStateBackend; + + FlinkCombiningState( + KeyedStateBackend flinkStateBackend, + StateTag> address, + Combine.CombineFn combineFn, + StateNamespace namespace, + Coder accumCoder) { + + this.namespace = namespace; + this.address = address; + this.combineFn = combineFn; + this.flinkStateBackend = flinkStateBackend; + + CoderTypeInformation typeInfo = new CoderTypeInformation<>(accumCoder); + + flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), typeInfo, null); + } + + @Override + public CombiningState readLater() { + return this; + } + + @Override + public void add(InputT value) { + try { + org.apache.flink.api.common.state.ValueState state = + flinkStateBackend.getPartitionedState( + namespace.stringKey(), + StringSerializer.INSTANCE, + flinkStateDescriptor); + + AccumT current = state.value(); + if (current == null) { + current = combineFn.createAccumulator(); + } + current = combineFn.addInput(current, value); + state.update(current); + } catch (Exception e) { + throw new RuntimeException("Error adding to state." , e); + } + } + + @Override + public void addAccum(AccumT accum) { + try { + org.apache.flink.api.common.state.ValueState state = + flinkStateBackend.getPartitionedState( + namespace.stringKey(), + StringSerializer.INSTANCE, + flinkStateDescriptor); + + AccumT current = state.value(); + if (current == null) { + state.update(accum); + } else { + current = combineFn.mergeAccumulators(Lists.newArrayList(current, accum)); + state.update(current); + } + } catch (Exception e) { + throw new RuntimeException("Error adding to state.", e); + } + } + + @Override + public AccumT getAccum() { + try { + return flinkStateBackend.getPartitionedState( + namespace.stringKey(), + StringSerializer.INSTANCE, + flinkStateDescriptor).value(); + } catch (Exception e) { + throw new RuntimeException("Error reading state.", e); + } + } + + @Override + public AccumT mergeAccumulators(Iterable accumulators) { + return combineFn.mergeAccumulators(accumulators); + } + + @Override + public OutputT read() { + try { + org.apache.flink.api.common.state.ValueState state = + flinkStateBackend.getPartitionedState( + namespace.stringKey(), + StringSerializer.INSTANCE, + flinkStateDescriptor); + + AccumT accum = state.value(); + if (accum != null) { + return combineFn.extractOutput(accum); + } else { + return combineFn.extractOutput(combineFn.createAccumulator()); + } + } catch (Exception e) { + throw new RuntimeException("Error reading state.", e); + } + } + + @Override + public ReadableState isEmpty() { + return new ReadableState() { + @Override + public Boolean read() { + try { + return flinkStateBackend.getPartitionedState( + namespace.stringKey(), + StringSerializer.INSTANCE, + flinkStateDescriptor).value() == null; + } catch (Exception e) { + throw new RuntimeException("Error reading state.", e); + } + + } + + @Override + public ReadableState readLater() { + return this; + } + }; + } + + @Override + public void clear() { + try { + flinkStateBackend.getPartitionedState( + namespace.stringKey(), + StringSerializer.INSTANCE, + flinkStateDescriptor).clear(); + } catch (Exception e) { + throw new RuntimeException("Error clearing state.", e); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + FlinkCombiningState that = + (FlinkCombiningState) o; + + return namespace.equals(that.namespace) && address.equals(that.address); + + } + + @Override + public int hashCode() { + int result = namespace.hashCode(); + result = 31 * result + address.hashCode(); + return result; + } + } + + private static class FlinkKeyedCombiningState + implements CombiningState { + + private final StateNamespace namespace; + private final StateTag> address; + private final Combine.KeyedCombineFn combineFn; + private final ValueStateDescriptor flinkStateDescriptor; + private final KeyedStateBackend flinkStateBackend; + private final FlinkStateInternals flinkStateInternals; + + FlinkKeyedCombiningState( + KeyedStateBackend flinkStateBackend, + StateTag> address, + Combine.KeyedCombineFn combineFn, + StateNamespace namespace, + Coder accumCoder, + FlinkStateInternals flinkStateInternals) { + + this.namespace = namespace; + this.address = address; + this.combineFn = combineFn; + this.flinkStateBackend = flinkStateBackend; + this.flinkStateInternals = flinkStateInternals; + + CoderTypeInformation typeInfo = new CoderTypeInformation<>(accumCoder); + + flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), typeInfo, null); + } + + @Override + public CombiningState readLater() { + return this; + } + + @Override + public void add(InputT value) { + try { + org.apache.flink.api.common.state.ValueState state = + flinkStateBackend.getPartitionedState( + namespace.stringKey(), + StringSerializer.INSTANCE, + flinkStateDescriptor); + + AccumT current = state.value(); + if (current == null) { + current = combineFn.createAccumulator(flinkStateInternals.getKey()); + } + current = combineFn.addInput(flinkStateInternals.getKey(), current, value); + state.update(current); + } catch (Exception e) { + throw new RuntimeException("Error adding to state." , e); + } + } + + @Override + public void addAccum(AccumT accum) { + try { + org.apache.flink.api.common.state.ValueState state = + flinkStateBackend.getPartitionedState( + namespace.stringKey(), + StringSerializer.INSTANCE, + flinkStateDescriptor); + + AccumT current = state.value(); + if (current == null) { + state.update(accum); + } else { + current = combineFn.mergeAccumulators( + flinkStateInternals.getKey(), + Lists.newArrayList(current, accum)); + state.update(current); + } + } catch (Exception e) { + throw new RuntimeException("Error adding to state.", e); + } + } + + @Override + public AccumT getAccum() { + try { + return flinkStateBackend.getPartitionedState( + namespace.stringKey(), + StringSerializer.INSTANCE, + flinkStateDescriptor).value(); + } catch (Exception e) { + throw new RuntimeException("Error reading state.", e); + } + } + + @Override + public AccumT mergeAccumulators(Iterable accumulators) { + return combineFn.mergeAccumulators(flinkStateInternals.getKey(), accumulators); + } + + @Override + public OutputT read() { + try { + org.apache.flink.api.common.state.ValueState state = + flinkStateBackend.getPartitionedState( + namespace.stringKey(), + StringSerializer.INSTANCE, + flinkStateDescriptor); + + AccumT accum = state.value(); + if (accum != null) { + return combineFn.extractOutput(flinkStateInternals.getKey(), accum); + } else { + return combineFn.extractOutput( + flinkStateInternals.getKey(), + combineFn.createAccumulator(flinkStateInternals.getKey())); + } + } catch (Exception e) { + throw new RuntimeException("Error reading state.", e); + } + } + + @Override + public ReadableState isEmpty() { + return new ReadableState() { + @Override + public Boolean read() { + try { + return flinkStateBackend.getPartitionedState( + namespace.stringKey(), + StringSerializer.INSTANCE, + flinkStateDescriptor).value() == null; + } catch (Exception e) { + throw new RuntimeException("Error reading state.", e); + } + + } + + @Override + public ReadableState readLater() { + return this; + } + }; + } + + @Override + public void clear() { + try { + flinkStateBackend.getPartitionedState( + namespace.stringKey(), + StringSerializer.INSTANCE, + flinkStateDescriptor).clear(); + } catch (Exception e) { + throw new RuntimeException("Error clearing state.", e); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + FlinkKeyedCombiningState that = + (FlinkKeyedCombiningState) o; + + return namespace.equals(that.namespace) && address.equals(that.address); + + } + + @Override + public int hashCode() { + int result = namespace.hashCode(); + result = 31 * result + address.hashCode(); + return result; + } + } + + private static class FlinkCombiningStateWithContext + implements CombiningState { + + private final StateNamespace namespace; + private final StateTag> address; + private final CombineWithContext.KeyedCombineFnWithContext< + ? super K, InputT, AccumT, OutputT> combineFn; + private final ValueStateDescriptor flinkStateDescriptor; + private final KeyedStateBackend flinkStateBackend; + private final FlinkStateInternals flinkStateInternals; + private final CombineWithContext.Context context; + + FlinkCombiningStateWithContext( + KeyedStateBackend flinkStateBackend, + StateTag> address, + CombineWithContext.KeyedCombineFnWithContext< + ? super K, InputT, AccumT, OutputT> combineFn, + StateNamespace namespace, + Coder accumCoder, + FlinkStateInternals flinkStateInternals, + CombineWithContext.Context context) { + + this.namespace = namespace; + this.address = address; + this.combineFn = combineFn; + this.flinkStateBackend = flinkStateBackend; + this.flinkStateInternals = flinkStateInternals; + this.context = context; + + CoderTypeInformation typeInfo = new CoderTypeInformation<>(accumCoder); + + flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), typeInfo, null); + } + + @Override + public CombiningState readLater() { + return this; + } + + @Override + public void add(InputT value) { + try { + org.apache.flink.api.common.state.ValueState state = + flinkStateBackend.getPartitionedState( + namespace.stringKey(), + StringSerializer.INSTANCE, + flinkStateDescriptor); + + AccumT current = state.value(); + if (current == null) { + current = combineFn.createAccumulator(flinkStateInternals.getKey(), context); + } + current = combineFn.addInput(flinkStateInternals.getKey(), current, value, context); + state.update(current); + } catch (Exception e) { + throw new RuntimeException("Error adding to state." , e); + } + } + + @Override + public void addAccum(AccumT accum) { + try { + org.apache.flink.api.common.state.ValueState state = + flinkStateBackend.getPartitionedState( + namespace.stringKey(), + StringSerializer.INSTANCE, + flinkStateDescriptor); + + AccumT current = state.value(); + if (current == null) { + state.update(accum); + } else { + current = combineFn.mergeAccumulators( + flinkStateInternals.getKey(), + Lists.newArrayList(current, accum), + context); + state.update(current); + } + } catch (Exception e) { + throw new RuntimeException("Error adding to state.", e); + } + } + + @Override + public AccumT getAccum() { + try { + return flinkStateBackend.getPartitionedState( + namespace.stringKey(), + StringSerializer.INSTANCE, + flinkStateDescriptor).value(); + } catch (Exception e) { + throw new RuntimeException("Error reading state.", e); + } + } + + @Override + public AccumT mergeAccumulators(Iterable accumulators) { + return combineFn.mergeAccumulators(flinkStateInternals.getKey(), accumulators, context); + } + + @Override + public OutputT read() { + try { + org.apache.flink.api.common.state.ValueState state = + flinkStateBackend.getPartitionedState( + namespace.stringKey(), + StringSerializer.INSTANCE, + flinkStateDescriptor); + + AccumT accum = state.value(); + return combineFn.extractOutput(flinkStateInternals.getKey(), accum, context); + } catch (Exception e) { + throw new RuntimeException("Error reading state.", e); + } + } + + @Override + public ReadableState isEmpty() { + return new ReadableState() { + @Override + public Boolean read() { + try { + return flinkStateBackend.getPartitionedState( + namespace.stringKey(), + StringSerializer.INSTANCE, + flinkStateDescriptor).value() == null; + } catch (Exception e) { + throw new RuntimeException("Error reading state.", e); + } + + } + + @Override + public ReadableState readLater() { + return this; + } + }; + } + + @Override + public void clear() { + try { + flinkStateBackend.getPartitionedState( + namespace.stringKey(), + StringSerializer.INSTANCE, + flinkStateDescriptor).clear(); + } catch (Exception e) { + throw new RuntimeException("Error clearing state.", e); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + FlinkCombiningStateWithContext that = + (FlinkCombiningStateWithContext) o; + + return namespace.equals(that.namespace) && address.equals(that.address); + + } + + @Override + public int hashCode() { + int result = namespace.hashCode(); + result = 31 * result + address.hashCode(); + return result; + } + } + + private static class FlinkWatermarkHoldState + implements WatermarkHoldState { + private final StateTag> address; + private final OutputTimeFn outputTimeFn; + private final StateNamespace namespace; + private final KeyedStateBackend flinkStateBackend; + private final FlinkStateInternals flinkStateInternals; + private final ValueStateDescriptor flinkStateDescriptor; + + public FlinkWatermarkHoldState( + KeyedStateBackend flinkStateBackend, + FlinkStateInternals flinkStateInternals, + StateTag> address, + StateNamespace namespace, + OutputTimeFn outputTimeFn) { + this.address = address; + this.outputTimeFn = outputTimeFn; + this.namespace = namespace; + this.flinkStateBackend = flinkStateBackend; + this.flinkStateInternals = flinkStateInternals; + + CoderTypeInformation typeInfo = new CoderTypeInformation<>(InstantCoder.of()); + flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), typeInfo, null); + } + + @Override + public OutputTimeFn getOutputTimeFn() { + return outputTimeFn; + } + + @Override + public WatermarkHoldState readLater() { + return this; + } + + @Override + public ReadableState isEmpty() { + return new ReadableState() { + @Override + public Boolean read() { + try { + return flinkStateBackend.getPartitionedState( + namespace.stringKey(), + StringSerializer.INSTANCE, + flinkStateDescriptor).value() == null; + } catch (Exception e) { + throw new RuntimeException("Error reading state.", e); + } + } + + @Override + public ReadableState readLater() { + return this; + } + }; + + } + + @Override + public void add(Instant value) { + try { + org.apache.flink.api.common.state.ValueState state = + flinkStateBackend.getPartitionedState( + namespace.stringKey(), + StringSerializer.INSTANCE, + flinkStateDescriptor); + + Instant current = state.value(); + if (current == null) { + state.update(value); + flinkStateInternals.watermarkHolds.put(namespace.stringKey(), value); + } else { + Instant combined = outputTimeFn.combine(current, value); + state.update(combined); + flinkStateInternals.watermarkHolds.put(namespace.stringKey(), combined); + } + } catch (Exception e) { + throw new RuntimeException("Error updating state.", e); + } + } + + @Override + public Instant read() { + try { + org.apache.flink.api.common.state.ValueState state = + flinkStateBackend.getPartitionedState( + namespace.stringKey(), + StringSerializer.INSTANCE, + flinkStateDescriptor); + return state.value(); + } catch (Exception e) { + throw new RuntimeException("Error reading state.", e); + } + } + + @Override + public void clear() { + flinkStateInternals.watermarkHolds.remove(namespace.stringKey()); + try { + org.apache.flink.api.common.state.ValueState state = + flinkStateBackend.getPartitionedState( + namespace.stringKey(), + StringSerializer.INSTANCE, + flinkStateDescriptor); + state.clear(); + } catch (Exception e) { + throw new RuntimeException("Error reading state.", e); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + FlinkWatermarkHoldState that = (FlinkWatermarkHoldState) o; + + if (!address.equals(that.address)) { + return false; + } + if (!outputTimeFn.equals(that.outputTimeFn)) { + return false; + } + return namespace.equals(that.namespace); + + } + + @Override + public int hashCode() { + int result = address.hashCode(); + result = 31 * result + outputTimeFn.hashCode(); + result = 31 * result + namespace.hashCode(); + return result; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupCheckpointedOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupCheckpointedOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupCheckpointedOperator.java new file mode 100644 index 0000000..b38a520 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupCheckpointedOperator.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.state; + +import java.io.DataOutputStream; + +/** + * This interface is used to checkpoint key-groups state. + */ +public interface KeyGroupCheckpointedOperator extends KeyGroupRestoringOperator{ + /** + * Snapshots the state for a given {@code keyGroupIdx}. + * + *

AbstractStreamOperator would call this hook in + * AbstractStreamOperator.snapshotState() while iterating over the key groups. + * @param keyGroupIndex the id of the key-group to be put in the snapshot. + * @param out the stream to write to. + */ + void snapshotKeyGroupState(int keyGroupIndex, DataOutputStream out) throws Exception; +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupRestoringOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupRestoringOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupRestoringOperator.java new file mode 100644 index 0000000..2bdfc6e --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupRestoringOperator.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.state; + +import java.io.DataInputStream; + +/** + * This interface is used to restore key-groups state. + */ +public interface KeyGroupRestoringOperator { + /** + * Restore the state for a given {@code keyGroupIndex}. + * @param keyGroupIndex the id of the key-group to be put in the snapshot. + * @param in the stream to read from. + */ + void restoreKeyGroupState(int keyGroupIndex, DataInputStream in) throws Exception; +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/package-info.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/package-info.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/package-info.java new file mode 100644 index 0000000..0004e9e --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Internal state implementation of the Beam runner for Apache Flink. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.state; http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/resources/log4j.properties b/runners/flink/src/main/resources/log4j.properties new file mode 100644 index 0000000..4b6a708 --- /dev/null +++ b/runners/flink/src/main/resources/log4j.properties @@ -0,0 +1,23 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=OFF,console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/test/java/org/apache/beam/runners/flink/EncodedValueComparatorTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/EncodedValueComparatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/EncodedValueComparatorTest.java new file mode 100644 index 0000000..10d6d9d --- /dev/null +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/EncodedValueComparatorTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import org.apache.beam.runners.flink.translation.types.EncodedValueComparator; +import org.apache.beam.runners.flink.translation.types.EncodedValueTypeInformation; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.ComparatorTestBase; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.junit.Assert; + +/** + * Test for {@link EncodedValueComparator}. + */ +public class EncodedValueComparatorTest extends ComparatorTestBase { + + @Override + protected TypeComparator createComparator(boolean ascending) { + return new EncodedValueTypeInformation().createComparator(ascending, new ExecutionConfig()); + } + + @Override + protected TypeSerializer createSerializer() { + return new EncodedValueTypeInformation().createSerializer(new ExecutionConfig()); + } + + @Override + protected void deepEquals(String message, byte[] should, byte[] is) { + Assert.assertArrayEquals(message, should, is); + } + + @Override + protected byte[][] getSortedTestData() { + StringUtf8Coder coder = StringUtf8Coder.of(); + + try { + return new byte[][]{ + CoderUtils.encodeToByteArray(coder, ""), + CoderUtils.encodeToByteArray(coder, "Lorem Ipsum Dolor Omit Longer"), + CoderUtils.encodeToByteArray(coder, "aaaa"), + CoderUtils.encodeToByteArray(coder, "abcd"), + CoderUtils.encodeToByteArray(coder, "abce"), + CoderUtils.encodeToByteArray(coder, "abdd"), + CoderUtils.encodeToByteArray(coder, "accd"), + CoderUtils.encodeToByteArray(coder, "bbcd") + }; + } catch (CoderException e) { + throw new RuntimeException("Could not encode values.", e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java new file mode 100644 index 0000000..d9d174c --- /dev/null +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.flink; + +import static org.junit.Assert.assertEquals; + +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.junit.Test; + +/** + * Tests the proper registration of the Flink runner. + */ +public class FlinkRunnerRegistrarTest { + + @Test + public void testFullName() { + String[] args = + new String[] {String.format("--runner=%s", FlinkRunner.class.getName())}; + PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create(); + assertEquals(opts.getRunner(), FlinkRunner.class); + } + + @Test + public void testClassName() { + String[] args = + new String[] {String.format("--runner=%s", FlinkRunner.class.getSimpleName())}; + PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create(); + assertEquals(opts.getRunner(), FlinkRunner.class); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java new file mode 100644 index 0000000..d6240c4 --- /dev/null +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.runners.PipelineRunner; + +/** + * {@link org.apache.beam.sdk.Pipeline} for testing Dataflow programs on the + * {@link FlinkRunner}. + */ +public class FlinkTestPipeline extends Pipeline { + + /** + * Creates and returns a new test pipeline for batch execution. + * + *

Use {@link org.apache.beam.sdk.testing.PAssert} to add tests, then call + * {@link Pipeline#run} to execute the pipeline and check the tests. + */ + public static FlinkTestPipeline createForBatch() { + return create(false); + } + + /** + * Creates and returns a new test pipeline for streaming execution. + * + *

Use {@link org.apache.beam.sdk.testing.PAssert} to add tests, then call + * {@link Pipeline#run} to execute the pipeline and check the tests. + * + * @return The Test Pipeline + */ + public static FlinkTestPipeline createForStreaming() { + return create(true); + } + + /** + * Creates and returns a new test pipeline for streaming or batch execution. + * + *

Use {@link org.apache.beam.sdk.testing.PAssert} to add tests, then call + * {@link Pipeline#run} to execute the pipeline and check the tests. + * + * @param streaming True for streaming mode, False for batch. + * @return The Test Pipeline. + */ + private static FlinkTestPipeline create(boolean streaming) { + TestFlinkRunner flinkRunner = TestFlinkRunner.create(streaming); + return new FlinkTestPipeline(flinkRunner, flinkRunner.getPipelineOptions()); + } + + private FlinkTestPipeline(PipelineRunner runner, + PipelineOptions options) { + super(runner, options); + } +} + http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java new file mode 100644 index 0000000..06187f6 --- /dev/null +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.util.Collections; +import java.util.HashMap; +import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; +import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.commons.lang3.SerializationUtils; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.joda.time.Instant; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Tests for serialization and deserialization of {@link PipelineOptions} in {@link DoFnOperator}. + */ +public class PipelineOptionsTest { + + /** + * Pipeline options. + */ + public interface MyOptions extends FlinkPipelineOptions { + @Description("Bla bla bla") + @Default.String("Hello") + String getTestOption(); + void setTestOption(String value); + } + + private static MyOptions options; + private static SerializedPipelineOptions serializedOptions; + + private static final String[] args = new String[]{"--testOption=nothing"}; + + @BeforeClass + public static void beforeTest() { + options = PipelineOptionsFactory.fromArgs(args).as(MyOptions.class); + serializedOptions = new SerializedPipelineOptions(options); + } + + @Test + public void testDeserialization() { + MyOptions deserializedOptions = serializedOptions.getPipelineOptions().as(MyOptions.class); + assertEquals("nothing", deserializedOptions.getTestOption()); + } + + @Test + public void testIgnoredFieldSerialization() { + FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class); + options.setStateBackend(new MemoryStateBackend()); + + FlinkPipelineOptions deserialized = + new SerializedPipelineOptions(options).getPipelineOptions().as(FlinkPipelineOptions.class); + + assertNull(deserialized.getStateBackend()); + } + + @Test + public void testCaching() { + PipelineOptions deserializedOptions = + serializedOptions.getPipelineOptions().as(PipelineOptions.class); + + assertNotNull(deserializedOptions); + assertTrue(deserializedOptions == serializedOptions.getPipelineOptions()); + assertTrue(deserializedOptions == serializedOptions.getPipelineOptions()); + assertTrue(deserializedOptions == serializedOptions.getPipelineOptions()); + } + + @Test(expected = Exception.class) + public void testNonNull() { + new SerializedPipelineOptions(null); + } + + @Test(expected = Exception.class) + public void parDoBaseClassPipelineOptionsNullTest() { + DoFnOperator doFnOperator = new DoFnOperator<>( + new TestDoFn(), + WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()), + new TupleTag("main-output"), + Collections.>emptyList(), + new DoFnOperator.DefaultOutputManagerFactory(), + WindowingStrategy.globalDefault(), + new HashMap>(), + Collections.>emptyList(), + null, + null); + + } + + /** + * Tests that PipelineOptions are present after serialization. + */ + @Test + public void parDoBaseClassPipelineOptionsSerializationTest() throws Exception { + + DoFnOperator doFnOperator = new DoFnOperator<>( + new TestDoFn(), + WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()), + new TupleTag("main-output"), + Collections.>emptyList(), + new DoFnOperator.DefaultOutputManagerFactory(), + WindowingStrategy.globalDefault(), + new HashMap>(), + Collections.>emptyList(), + options, + null); + + final byte[] serialized = SerializationUtils.serialize(doFnOperator); + + @SuppressWarnings("unchecked") + DoFnOperator deserialized = + (DoFnOperator) SerializationUtils.deserialize(serialized); + + TypeInformation> typeInformation = TypeInformation.of( + new TypeHint>() {}); + + OneInputStreamOperatorTestHarness, Object> testHarness = + new OneInputStreamOperatorTestHarness<>(deserialized, + typeInformation.createSerializer(new ExecutionConfig())); + + testHarness.open(); + + // execute once to access options + testHarness.processElement(new StreamRecord<>( + WindowedValue.of( + new Object(), + Instant.now(), + GlobalWindow.INSTANCE, + PaneInfo.NO_FIRING))); + + testHarness.close(); + + } + + + private static class TestDoFn extends DoFn { + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + Assert.assertNotNull(c.getPipelineOptions()); + Assert.assertEquals( + options.getTestOption(), + c.getPipelineOptions().as(MyOptions.class).getTestOption()); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java new file mode 100644 index 0000000..44c9017 --- /dev/null +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import com.google.common.base.Joiner; +import java.io.File; +import java.net.URI; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.CountingInput; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.flink.test.util.JavaProgramTestBase; + +/** + * Reads from a bounded source in batch execution. + */ +public class ReadSourceITCase extends JavaProgramTestBase { + + protected String resultPath; + + public ReadSourceITCase(){ + } + + private static final String[] EXPECTED_RESULT = new String[] { + "0", "1", "2", "3", "4", "5", "6", "7", "8", "9"}; + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + + // need to create the dir, otherwise Beam sinks don't + // work for these tests + + if (!new File(new URI(resultPath)).mkdirs()) { + throw new RuntimeException("Could not create output dir."); + } + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); + } + + @Override + protected void testProgram() throws Exception { + runProgram(resultPath); + } + + private static void runProgram(String resultPath) throws Exception { + + Pipeline p = FlinkTestPipeline.createForBatch(); + + PCollection result = p + .apply(CountingInput.upTo(10)) + .apply(ParDo.of(new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + c.output(c.element().toString()); + } + })); + + result.apply(TextIO.Write.to(new URI(resultPath).getPath() + "/part")); + + p.run(); + } +} + + http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java new file mode 100644 index 0000000..79b7882 --- /dev/null +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import com.google.common.base.Joiner; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.CountingInput; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.flink.streaming.util.StreamingProgramTestBase; + +/** + * Reads from a bounded source in streaming. + */ +public class ReadSourceStreamingITCase extends StreamingProgramTestBase { + + protected String resultPath; + + public ReadSourceStreamingITCase(){ + } + + private static final String[] EXPECTED_RESULT = new String[] { + "0", "1", "2", "3", "4", "5", "6", "7", "8", "9"}; + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); + } + + @Override + protected void testProgram() throws Exception { + runProgram(resultPath); + } + + private static void runProgram(String resultPath) { + + Pipeline p = FlinkTestPipeline.createForStreaming(); + + p + .apply(CountingInput.upTo(10)) + .apply(ParDo.of(new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + c.output(c.element().toString()); + } + })) + .apply(TextIO.Write.to(resultPath)); + + p.run(); + } +} + + http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java new file mode 100644 index 0000000..38b790e --- /dev/null +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.flink; + +import static org.junit.Assert.assertNotNull; + +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.net.URI; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.Sink; +import org.apache.beam.sdk.io.Write; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.test.util.JavaProgramTestBase; + +/** + * Tests the translation of custom Write sinks. + */ +public class WriteSinkITCase extends JavaProgramTestBase { + + protected String resultPath; + + public WriteSinkITCase(){ + } + + static final String[] EXPECTED_RESULT = new String[] { + "Joe red 3", "Mary blue 4", "Max yellow 23"}; + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result-" + System.nanoTime()); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); + } + + @Override + protected void testProgram() throws Exception { + runProgram(resultPath); + } + + @Override + public void stopCluster() throws Exception { + try { + super.stopCluster(); + } catch (final IOException ioe) { + if (ioe.getMessage().startsWith("Unable to delete file")) { + // that's ok for the test itself, just the OS playing with us on cleanup phase + } + } + } + + private static void runProgram(String resultPath) { + Pipeline p = FlinkTestPipeline.createForBatch(); + + p.apply(Create.of(ImmutableList.copyOf(EXPECTED_RESULT))).setCoder(StringUtf8Coder.of()) + .apply("CustomSink", Write.to(new MyCustomSink(resultPath))); + + p.run(); + } + + /** + * Simple custom sink which writes to a file. + */ + private static class MyCustomSink extends Sink { + + private final String resultPath; + + public MyCustomSink(String resultPath) { + this.resultPath = resultPath; + } + + @Override + public void validate(PipelineOptions options) { + assertNotNull(options); + } + + @Override + public WriteOperation createWriteOperation(PipelineOptions options) { + return new MyWriteOperation(); + } + + private class MyWriteOperation extends WriteOperation { + + @Override + public Coder getWriterResultCoder() { + return StringUtf8Coder.of(); + } + + @Override + public void initialize(PipelineOptions options) throws Exception { + + } + + @Override + public void setWindowedWrites(boolean windowedWrites) { + + } + + @Override + public void finalize(Iterable writerResults, PipelineOptions options) + throws Exception { + + } + + @Override + public Writer createWriter(PipelineOptions options) throws Exception { + return new MyWriter(); + } + + @Override + public Sink getSink() { + return MyCustomSink.this; + } + + /** + * Simple Writer which writes to a file. + */ + private class MyWriter extends Writer { + + private PrintWriter internalWriter; + + @Override + public final void openWindowed(String uId, + BoundedWindow window, + PaneInfo paneInfo, + int shard, + int numShards) throws Exception { + throw new UnsupportedOperationException("Windowed writes not supported."); + } + + @Override + public final void openUnwindowed(String uId, int shard, int numShards) throws Exception { + Path path = new Path(resultPath + "/" + uId); + FileSystem.get(new URI("file:///")).create(path, false); + internalWriter = new PrintWriter(new File(path.toUri())); + } + + @Override + public void cleanup() throws Exception { + + } + + @Override + public void write(String value) throws Exception { + internalWriter.println(value); + } + + @Override + public String close() throws Exception { + internalWriter.close(); + return resultPath; + } + + @Override + public WriteOperation getWriteOperation() { + return MyWriteOperation.this; + } + } + } + } + +} +