Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id AED5D200C7E for ; Tue, 9 May 2017 01:41:45 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id AD6A3160BA5; Mon, 8 May 2017 23:41:45 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5A20D160BBF for ; Tue, 9 May 2017 01:41:44 +0200 (CEST) Received: (qmail 10357 invoked by uid 500); 8 May 2017 23:41:43 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 10227 invoked by uid 99); 8 May 2017 23:41:43 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 May 2017 23:41:43 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5FBCADFEF3; Mon, 8 May 2017 23:41:43 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tgroh@apache.org To: commits@beam.apache.org Date: Mon, 08 May 2017 23:41:44 -0000 Message-Id: <54d4b45e4b164f2691ffb237e46fc02d@git.apache.org> In-Reply-To: <5e3982d7beb84ec7a914c424e3b7ce13@git.apache.org> References: <5e3982d7beb84ec7a914c424e3b7ce13@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] beam git commit: [BEAM-1723] deduplication of UnboundedSource in Flink runner archived-at: Mon, 08 May 2017 23:41:45 -0000 [BEAM-1723] deduplication of UnboundedSource in Flink runner Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/392ed601 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/392ed601 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/392ed601 Branch: refs/heads/release-2.0.0 Commit: 392ed601392dbf5ace32577c3a4dee13488cedc4 Parents: 5493c6c Author: JingsongLi Authored: Wed Apr 19 19:42:59 2017 +0800 Committer: Thomas Groh Committed: Mon May 8 16:41:25 2017 -0700 ---------------------------------------------------------------------- .../FlinkStreamingTransformTranslators.java | 54 +++++- .../wrappers/streaming/io/DedupingOperator.java | 187 +++++++++++++++++++ .../streaming/io/UnboundedSourceWrapper.java | 15 +- .../flink/streaming/DedupingOperatorTest.java | 131 +++++++++++++ .../streaming/UnboundedSourceWrapperTest.java | 29 +-- 5 files changed, 393 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/392ed601/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index 615eaea..9a93205 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -44,6 +44,7 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDo import org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator; import org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.BoundedSourceWrapper; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.DedupingOperator; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; @@ -73,12 +74,16 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.ValueWithRecordId; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; @@ -148,20 +153,37 @@ class FlinkStreamingTransformTranslators { FlinkStreamingTranslationContext context) { PCollection output = context.getOutput(transform); + DataStream> source; + DataStream>> nonDedupSource; TypeInformation> outputTypeInfo = context.getTypeInfo(context.getOutput(transform)); - DataStream> source; + Coder coder = context.getOutput(transform).getCoder(); + + TypeInformation>> withIdTypeInfo = + new CoderTypeInformation<>(WindowedValue.getFullCoder( + ValueWithRecordId.ValueWithRecordIdCoder.of(coder), + output.getWindowingStrategy().getWindowFn().windowCoder())); + try { + UnboundedSourceWrapper sourceWrapper = new UnboundedSourceWrapper<>( context.getCurrentTransform().getFullName(), context.getPipelineOptions(), transform.getSource(), context.getExecutionEnvironment().getParallelism()); - source = context + nonDedupSource = context .getExecutionEnvironment() - .addSource(sourceWrapper).name(transform.getName()).returns(outputTypeInfo); + .addSource(sourceWrapper).name(transform.getName()).returns(withIdTypeInfo); + + if (transform.getSource().requiresDeduping()) { + source = nonDedupSource.keyBy( + new ValueWithRecordIdKeySelector()) + .transform("debuping", outputTypeInfo, new DedupingOperator()); + } else { + source = nonDedupSource.flatMap(new StripIdsMap()); + } } catch (Exception e) { throw new RuntimeException( "Error while translating UnboundedSource: " + transform.getSource(), e); @@ -171,6 +193,32 @@ class FlinkStreamingTransformTranslators { } } + private static class ValueWithRecordIdKeySelector + implements KeySelector>, ByteBuffer>, + ResultTypeQueryable { + + @Override + public ByteBuffer getKey(WindowedValue> value) throws Exception { + return ByteBuffer.wrap(value.getValue().getId()); + } + + @Override + public TypeInformation getProducedType() { + return new GenericTypeInfo<>(ByteBuffer.class); + } + } + + public static class StripIdsMap implements + FlatMapFunction>, WindowedValue> { + + @Override + public void flatMap(WindowedValue> value, + Collector> collector) throws Exception { + collector.collect(value.withValue(value.getValue().getValue())); + } + + } + private static class BoundedReadSourceTranslator extends FlinkStreamingPipelineTranslator.StreamTransformTranslator> { http://git-wip-us.apache.org/repos/asf/beam/blob/392ed601/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/DedupingOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/DedupingOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/DedupingOperator.java new file mode 100644 index 0000000..b8b40fe --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/DedupingOperator.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.io; + +import static org.apache.flink.util.Preconditions.checkArgument; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.nio.ByteBuffer; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.beam.runners.flink.translation.wrappers.streaming.state.KeyGroupCheckpointedOperator; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder.Context; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.ValueWithRecordId; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; +import org.apache.flink.runtime.state.KeyGroupsList; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.joda.time.Duration; + +/** + * Remove values with duplicate ids. + */ +public class DedupingOperator extends AbstractStreamOperator> + implements OneInputStreamOperator>, WindowedValue>, + KeyGroupCheckpointedOperator { + + private static final long MAX_RETENTION_SINCE_ACCESS = Duration.standardMinutes(10L).getMillis(); + private static final long MAX_CACHE_SIZE = 100_000L; + + private transient LoadingCache> dedupingCache; + private transient KeyedStateBackend keyedStateBackend; + + @Override + public void open() throws Exception { + super.open(); + checkInitCache(); + keyedStateBackend = getKeyedStateBackend(); + } + + private void checkInitCache() { + if (dedupingCache == null) { + dedupingCache = CacheBuilder.newBuilder().build(new KeyGroupLoader()); + } + } + + private static class KeyGroupLoader extends + CacheLoader> { + @Override + public LoadingCache load(Integer ignore) throws Exception { + return CacheBuilder.newBuilder() + .expireAfterAccess(MAX_RETENTION_SINCE_ACCESS, TimeUnit.MILLISECONDS) + .maximumSize(MAX_CACHE_SIZE).build(new TrueBooleanLoader()); + } + } + + private static class TrueBooleanLoader extends CacheLoader { + @Override + public AtomicBoolean load(ByteBuffer ignore) throws Exception { + return new AtomicBoolean(true); + } + } + + @Override + public void processElement( + StreamRecord>> streamRecord) throws Exception { + ByteBuffer currentKey = keyedStateBackend.getCurrentKey(); + int groupIndex = keyedStateBackend.getCurrentKeyGroupIndex(); + if (shouldOutput(groupIndex, currentKey)) { + WindowedValue> value = streamRecord.getValue(); + output.collect(streamRecord.replace(value.withValue(value.getValue().getValue()))); + } + } + + private boolean shouldOutput(int groupIndex, ByteBuffer id) throws ExecutionException { + return dedupingCache.get(groupIndex).getUnchecked(id).getAndSet(false); + } + + @Override + public void restoreKeyGroupState(int keyGroupIndex, DataInputStream in) throws Exception { + checkInitCache(); + Integer size = VarIntCoder.of().decode(in, Context.NESTED); + for (int i = 0; i < size; i++) { + byte[] idBytes = ByteArrayCoder.of().decode(in, Context.NESTED); + // restore the ids which not expired. + shouldOutput(keyGroupIndex, ByteBuffer.wrap(idBytes)); + } + } + + @Override + public void snapshotKeyGroupState(int keyGroupIndex, DataOutputStream out) throws Exception { + Set ids = dedupingCache.get(keyGroupIndex).asMap().keySet(); + VarIntCoder.of().encode(ids.size(), out, Context.NESTED); + for (ByteBuffer id : ids) { + ByteArrayCoder.of().encode(id.array(), out, Context.NESTED); + } + } + + @Override + public void snapshotState(StateSnapshotContext context) throws Exception { + // copy from AbstractStreamOperator + if (getKeyedStateBackend() != null) { + KeyedStateCheckpointOutputStream out; + + try { + out = context.getRawKeyedOperatorStateOutput(); + } catch (Exception exception) { + throw new Exception("Could not open raw keyed operator state stream for " + + getOperatorName() + '.', exception); + } + + try { + KeyGroupsList allKeyGroups = out.getKeyGroupList(); + for (int keyGroupIdx : allKeyGroups) { + out.startNewKeyGroup(keyGroupIdx); + + DataOutputViewStreamWrapper dov = new DataOutputViewStreamWrapper(out); + + // if (this instanceof KeyGroupCheckpointedOperator) + snapshotKeyGroupState(keyGroupIdx, dov); + + } + } catch (Exception exception) { + throw new Exception("Could not write timer service of " + getOperatorName() + + " to checkpoint state stream.", exception); + } finally { + try { + out.close(); + } catch (Exception closeException) { + LOG.warn("Could not close raw keyed operator state stream for {}. This " + + "might have prevented deleting some state data.", getOperatorName(), + closeException); + } + } + } + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + if (getKeyedStateBackend() != null) { + KeyGroupsList localKeyGroupRange = getKeyedStateBackend().getKeyGroupRange(); + + for (KeyGroupStatePartitionStreamProvider streamProvider : context.getRawKeyedStateInputs()) { + DataInputViewStreamWrapper div = new DataInputViewStreamWrapper(streamProvider.getStream()); + + int keyGroupIdx = streamProvider.getKeyGroupId(); + checkArgument(localKeyGroupRange.contains(keyGroupIdx), + "Key Group " + keyGroupIdx + " does not belong to the local range."); + + // if (this instanceof KeyGroupRestoringOperator) + restoreKeyGroupState(keyGroupIdx, div); + + } + } + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/392ed601/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java index ee20fd5..a731e2b 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java @@ -36,6 +36,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.ValueWithRecordId; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.StoppableFunction; import org.apache.flink.api.common.state.ListState; @@ -60,7 +61,7 @@ import org.slf4j.LoggerFactory; */ public class UnboundedSourceWrapper< OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark> - extends RichParallelSourceFunction> + extends RichParallelSourceFunction>> implements ProcessingTimeCallback, StoppableFunction, CheckpointListener, CheckpointedFunction { @@ -113,7 +114,7 @@ public class UnboundedSourceWrapper< * Make it a field so that we can access it in {@link #onProcessingTime(long)} for emitting * watermarks. */ - private transient SourceContext> context; + private transient SourceContext>> context; /** * Pending checkpoints which have not been acknowledged yet. @@ -210,7 +211,7 @@ public class UnboundedSourceWrapper< } @Override - public void run(SourceContext> ctx) throws Exception { + public void run(SourceContext>> ctx) throws Exception { context = ctx; @@ -306,17 +307,19 @@ public class UnboundedSourceWrapper< * Emit the current element from the given Reader. The reader is guaranteed to have data. */ private void emitElement( - SourceContext> ctx, + SourceContext>> ctx, UnboundedSource.UnboundedReader reader) { // make sure that reader state update and element emission are atomic // with respect to snapshots synchronized (ctx.getCheckpointLock()) { OutputT item = reader.getCurrent(); + byte[] recordId = reader.getCurrentRecordId(); Instant timestamp = reader.getCurrentTimestamp(); - WindowedValue windowedValue = - WindowedValue.of(item, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING); + WindowedValue> windowedValue = + WindowedValue.of(new ValueWithRecordId<>(item, recordId), timestamp, + GlobalWindow.INSTANCE, PaneInfo.NO_FIRING); ctx.collectWithTimestamp(windowedValue, timestamp.getMillis()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/392ed601/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java new file mode 100644 index 0000000..81efa34 --- /dev/null +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.streaming; + +import static org.hamcrest.collection.IsIterableContainingInOrder.contains; +import static org.junit.Assert.assertThat; + +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.google.common.collect.FluentIterable; +import java.nio.ByteBuffer; +import javax.annotation.Nullable; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.DedupingOperator; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.ValueWithRecordId; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link DedupingOperator}. + */ +@RunWith(JUnit4.class) +public class DedupingOperatorTest { + + @Test + public void testDeduping() throws Exception { + + KeyedOneInputStreamOperatorTestHarness< + ByteBuffer, + WindowedValue>, + WindowedValue> harness = getDebupingHarness(); + + harness.open(); + + String key1 = "key1"; + String key2 = "key2"; + + harness.processElement(new StreamRecord<>( + WindowedValue.valueInGlobalWindow(new ValueWithRecordId<>(key1, key1.getBytes())))); + + harness.processElement(new StreamRecord<>( + WindowedValue.valueInGlobalWindow(new ValueWithRecordId<>(key2, key2.getBytes())))); + + harness.processElement(new StreamRecord<>( + WindowedValue.valueInGlobalWindow(new ValueWithRecordId<>(key1, key1.getBytes())))); + + assertThat( + this.stripStreamRecordFromWindowedValue(harness.getOutput()), + contains(WindowedValue.valueInGlobalWindow(key1), + WindowedValue.valueInGlobalWindow(key2))); + + OperatorStateHandles snapshot = harness.snapshot(0L, 0L); + + harness.close(); + + harness = getDebupingHarness(); + harness.setup(); + harness.initializeState(snapshot); + harness.open(); + + String key3 = "key3"; + + harness.processElement(new StreamRecord<>( + WindowedValue.valueInGlobalWindow(new ValueWithRecordId<>(key2, key2.getBytes())))); + + harness.processElement(new StreamRecord<>( + WindowedValue.valueInGlobalWindow(new ValueWithRecordId<>(key3, key3.getBytes())))); + + assertThat( + this.stripStreamRecordFromWindowedValue(harness.getOutput()), + contains(WindowedValue.valueInGlobalWindow(key3))); + + harness.close(); + } + + private KeyedOneInputStreamOperatorTestHarness>, + WindowedValue> getDebupingHarness() throws Exception { + DedupingOperator operator = new DedupingOperator<>(); + + return new KeyedOneInputStreamOperatorTestHarness<>(operator, + new KeySelector>, ByteBuffer>() { + @Override + public ByteBuffer getKey(WindowedValue> value) throws Exception { + return ByteBuffer.wrap(value.getValue().getId()); + } + }, TypeInformation.of(ByteBuffer.class)); + } + + private Iterable> stripStreamRecordFromWindowedValue( + Iterable input) { + + return FluentIterable.from(input).filter(new Predicate() { + @Override + public boolean apply(@Nullable Object o) { + return o instanceof StreamRecord && ((StreamRecord) o).getValue() instanceof WindowedValue; + } + }).transform(new Function>() { + @Nullable + @Override + @SuppressWarnings({"unchecked", "rawtypes"}) + public WindowedValue apply(@Nullable Object o) { + if (o instanceof StreamRecord && ((StreamRecord) o).getValue() instanceof WindowedValue) { + return (WindowedValue) ((StreamRecord) o).getValue(); + } + throw new RuntimeException("unreachable"); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/392ed601/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java index 0cb528a..500fa66 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java @@ -37,6 +37,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.ValueWithRecordId; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.state.ListState; @@ -116,7 +117,7 @@ public class UnboundedSourceWrapperTest { assertEquals(numSplits, flinkWrapper.getSplitSources().size()); StreamSource>, + ValueWithRecordId>>, UnboundedSourceWrapper< KV, TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper); @@ -126,7 +127,7 @@ public class UnboundedSourceWrapperTest { try { sourceOperator.open(); sourceOperator.run(checkpointLock, - new Output>>>() { + new Output>>>>() { private int count = 0; @Override @@ -138,8 +139,8 @@ public class UnboundedSourceWrapperTest { } @Override - public void collect( - StreamRecord>> windowedValueStreamRecord) { + public void collect(StreamRecord>>> windowedValueStreamRecord) { count++; if (count >= numElements) { @@ -184,7 +185,7 @@ public class UnboundedSourceWrapperTest { assertEquals(numSplits, flinkWrapper.getSplitSources().size()); StreamSource< - WindowedValue>, + WindowedValue>>, UnboundedSourceWrapper< KV, TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper); @@ -214,7 +215,7 @@ public class UnboundedSourceWrapperTest { try { sourceOperator.open(); sourceOperator.run(checkpointLock, - new Output>>>() { + new Output>>>>() { private int count = 0; @Override @@ -226,10 +227,10 @@ public class UnboundedSourceWrapperTest { } @Override - public void collect( - StreamRecord>> windowedValueStreamRecord) { + public void collect(StreamRecord>>> windowedValueStreamRecord) { - emittedElements.add(windowedValueStreamRecord.getValue().getValue()); + emittedElements.add(windowedValueStreamRecord.getValue().getValue().getValue()); count++; if (count >= numElements / 2) { throw new SuccessException(); @@ -275,7 +276,7 @@ public class UnboundedSourceWrapperTest { assertEquals(numSplits, restoredFlinkWrapper.getSplitSources().size()); StreamSource< - WindowedValue>, + WindowedValue>>, UnboundedSourceWrapper< KV, TestCountingSource.CounterMark>> restoredSourceOperator = @@ -292,7 +293,7 @@ public class UnboundedSourceWrapperTest { try { restoredSourceOperator.open(); restoredSourceOperator.run(checkpointLock, - new Output>>>() { + new Output>>>>() { private int count = 0; @Override @@ -304,9 +305,9 @@ public class UnboundedSourceWrapperTest { } @Override - public void collect( - StreamRecord>> windowedValueStreamRecord) { - emittedElements.add(windowedValueStreamRecord.getValue().getValue()); + public void collect(StreamRecord>>> windowedValueStreamRecord) { + emittedElements.add(windowedValueStreamRecord.getValue().getValue().getValue()); count++; if (count >= numElements / 2) { throw new SuccessException();