Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id EEA9B200C5A for ; Tue, 18 Apr 2017 22:25:54 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id ECFCE160BA1; Tue, 18 Apr 2017 20:25:54 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4A783160B87 for ; Tue, 18 Apr 2017 22:25:53 +0200 (CEST) Received: (qmail 16602 invoked by uid 500); 18 Apr 2017 20:25:52 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 16587 invoked by uid 99); 18 Apr 2017 20:25:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 18 Apr 2017 20:25:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 542DEDFDAC; Tue, 18 Apr 2017 20:25:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kenn@apache.org To: commits@beam.apache.org Date: Tue, 18 Apr 2017 20:25:52 -0000 Message-Id: <7b2090d7ed1e4132a86822f6b6408c86@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] beam git commit: [BEAM-972] Add more unit test to Gearpump runner archived-at: Tue, 18 Apr 2017 20:25:55 -0000 Repository: beam Updated Branches: refs/heads/gearpump-runner f4f233304 -> ebbb61390 [BEAM-972] Add more unit test to Gearpump runner Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f3138dde Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f3138dde Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f3138dde Branch: refs/heads/gearpump-runner Commit: f3138dde12b0c6a5cfdb8fefab916a6060b7f5ea Parents: f4f2333 Author: huafengw Authored: Wed Apr 12 19:11:09 2017 +0800 Committer: huafengw Committed: Thu Apr 13 10:06:55 2017 +0800 ---------------------------------------------------------------------- .../FlattenPCollectionsTranslator.java | 2 +- .../translators/GroupByKeyTranslator.java | 20 ++- .../translators/WindowAssignTranslator.java | 5 +- ...teGearpumpPCollectionViewTranslatorTest.java | 57 +++++++ .../CreatePCollectionViewTranslatorTest.java | 55 +++++++ .../FlattenPCollectionsTranslatorTest.java | 137 +++++++++++++++++ .../translators/GroupByKeyTranslatorTest.java | 151 +++++++++++++++++++ .../translators/ReadBoundedTranslatorTest.java | 70 +++++++++ .../ReadUnboundedTranslatorTest.java | 70 +++++++++ .../translators/WindowAssignTranslatorTest.java | 110 ++++++++++++++ .../translators/io/GearpumpSourceTest.java | 36 ++--- .../gearpump/translators/io/ValueSoureTest.java | 15 +- 12 files changed, 695 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/f3138dde/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java index 3a465cb..56f7d1a 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java @@ -47,7 +47,6 @@ public class FlattenPCollectionsTranslator implements Set> unique = new HashSet<>(); for (TaggedPValue input: context.getInputs()) { PCollection collection = (PCollection) input.getValue(); - unique.add(collection); JavaStream inputStream = context.getInputStream(collection); if (null == merged) { merged = inputStream; @@ -60,6 +59,7 @@ public class FlattenPCollectionsTranslator implements merged = merged.merge(inputStream, transform.getName()); } + unique.add(collection); } if (null == merged) { http://git-wip-us.apache.org/repos/asf/beam/blob/f3138dde/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java index 5dfd3e9..54c8737 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java @@ -83,7 +83,10 @@ public class GroupByKeyTranslator implements TransformTranslator + /** + * A transform used internally to translate Beam's Window to Gearpump's Window. + */ + protected static class GearpumpWindowFn implements WindowFunction>, Serializable { private final boolean isNonMerging; @@ -115,7 +118,10 @@ public class GroupByKeyTranslator implements TransformTranslator extends + /** + * A transform used internally to group KV message by its key. + */ + protected static class GroupByFn extends GroupByFunction>, ByteBuffer> { private static final long serialVersionUID = -807905402490735530L; @@ -135,7 +141,10 @@ public class GroupByKeyTranslator implements TransformTranslator + /** + * A transform used internally to transform WindowedValue to KV. + */ + protected static class KeyedByTimestamp extends MapFunction>, KV>>> { @@ -154,7 +163,10 @@ public class GroupByKeyTranslator implements TransformTranslator extends + /** + * A transform used internally by Gearpump which encapsulates the merge logic. + */ + protected static class Merge extends FoldFunction>>, KV>>>> { http://git-wip-us.apache.org/repos/asf/beam/blob/f3138dde/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java index 29d8f02..2d70b63 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java @@ -57,7 +57,10 @@ public class WindowAssignTranslator implements TransformTranslator extends + /** + * A Function used internally by Gearpump to wrap the actual Beam's WindowFn. + */ + protected static class AssignWindows extends FlatMapFunction, WindowedValue> { private static final long serialVersionUID = 7284565861938681360L; http://git-wip-us.apache.org/repos/asf/beam/blob/f3138dde/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslatorTest.java new file mode 100644 index 0000000..b23b0c6 --- /dev/null +++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslatorTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.gearpump.translators; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.beam.runners.gearpump.GearpumpPipelineTranslator; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PValue; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; +import org.junit.Test; + +/** Tests for {@link CreateGearpumpPCollectionViewTranslator}. */ +public class CreateGearpumpPCollectionViewTranslatorTest { + + @Test + @SuppressWarnings({"rawtypes", "unchecked"}) + public void testTranslate() { + CreateGearpumpPCollectionViewTranslator translator = + new CreateGearpumpPCollectionViewTranslator(); + + GearpumpPipelineTranslator.CreateGearpumpPCollectionView pCollectionView = + mock(GearpumpPipelineTranslator.CreateGearpumpPCollectionView.class); + + JavaStream javaStream = mock(JavaStream.class); + TranslationContext translationContext = mock(TranslationContext.class); + + PValue mockInput = mock(PValue.class); + when(translationContext.getInput()).thenReturn(mockInput); + when(translationContext.getInputStream(mockInput)).thenReturn(javaStream); + + PCollectionView view = mock(PCollectionView.class); + when(translationContext.getOutput()).thenReturn(view); + + translator.translate(pCollectionView, translationContext); + verify(translationContext, times(1)).setOutputStream(view, javaStream); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f3138dde/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslatorTest.java new file mode 100644 index 0000000..42ff14e --- /dev/null +++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslatorTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.gearpump.translators; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PValue; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; +import org.junit.Test; + +/** Tests for {@link CreatePCollectionViewTranslator}. */ +public class CreatePCollectionViewTranslatorTest { + + @Test + @SuppressWarnings({"rawtypes", "unchecked"}) + public void testTranslate() { + CreatePCollectionViewTranslator translator = new CreatePCollectionViewTranslator(); + View.CreatePCollectionView> createView = + mock(View.CreatePCollectionView.class); + + JavaStream javaStream = mock(JavaStream.class); + TranslationContext translationContext = mock(TranslationContext.class); + + PValue mockInput = mock(PValue.class); + when(translationContext.getInput()).thenReturn(mockInput); + when(translationContext.getInputStream(mockInput)).thenReturn(javaStream); + + PCollectionView view = mock(PCollectionView.class); + when(translationContext.getOutput()).thenReturn(view); + + translator.translate(createView, translationContext); + verify(translationContext, times(1)).setOutputStream(view, javaStream); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f3138dde/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java new file mode 100644 index 0000000..fa89d4a --- /dev/null +++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.gearpump.translators; + +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.collect.Lists; +import java.util.Collections; +import org.apache.beam.runners.gearpump.translators.io.UnboundedSourceWrapper; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TaggedPValue; +import org.apache.gearpump.streaming.dsl.api.functions.MapFunction; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; +import org.apache.gearpump.streaming.source.DataSource; +import org.junit.Test; +import org.mockito.ArgumentMatcher; + +/** Tests for {@link FlattenPCollectionsTranslator}. */ +public class FlattenPCollectionsTranslatorTest { + + private FlattenPCollectionsTranslator translator = new FlattenPCollectionsTranslator(); + private Flatten.PCollections transform = mock(Flatten.PCollections.class); + + class UnboundedSourceWrapperMatcher extends ArgumentMatcher { + @Override + public boolean matches(Object o) { + return o instanceof UnboundedSourceWrapper; + } + } + + @Test + @SuppressWarnings({"rawtypes", "unchecked"}) + public void testTranslateWithEmptyCollection() { + PValue mockOutput = mock(PValue.class); + TranslationContext translationContext = mock(TranslationContext.class); + + when(translationContext.getInputs()).thenReturn(Collections.EMPTY_LIST); + when(translationContext.getOutput()).thenReturn(mockOutput); + + translator.translate(transform, translationContext); + verify(translationContext).getSourceStream(argThat(new UnboundedSourceWrapperMatcher())); + } + + @Test + @SuppressWarnings({"rawtypes", "unchecked"}) + public void testTranslateWithOneCollection() { + JavaStream javaStream = mock(JavaStream.class); + TranslationContext translationContext = mock(TranslationContext.class); + + TaggedPValue mockInput = mock(TaggedPValue.class); + PCollection mockCollection = mock(PCollection.class); + when(mockInput.getValue()).thenReturn(mockCollection); + + when(translationContext.getInputs()).thenReturn(Lists.newArrayList(mockInput)); + when(translationContext.getInputStream(mockCollection)).thenReturn(javaStream); + + PValue mockOutput = mock(PValue.class); + when(translationContext.getOutput()).thenReturn(mockOutput); + + translator.translate(transform, translationContext); + verify(translationContext, times(1)).setOutputStream(mockOutput, javaStream); + } + + @Test + @SuppressWarnings({"rawtypes", "unchecked"}) + public void testWithMoreThanOneCollections() { + String transformName = "transform"; + when(transform.getName()).thenReturn(transformName); + + JavaStream javaStream1 = mock(JavaStream.class); + JavaStream javaStream2 = mock(JavaStream.class); + TranslationContext translationContext = mock(TranslationContext.class); + + TaggedPValue mockInput1 = mock(TaggedPValue.class); + PCollection mockCollection1 = mock(PCollection.class); + when(mockInput1.getValue()).thenReturn(mockCollection1); + + TaggedPValue mockInput2 = mock(TaggedPValue.class); + PCollection mockCollection2 = mock(PCollection.class); + when(mockInput2.getValue()).thenReturn(mockCollection2); + + when(translationContext.getInputs()).thenReturn(Lists.newArrayList(mockInput1, mockInput2)); + when(translationContext.getInputStream(mockCollection1)).thenReturn(javaStream1); + when(translationContext.getInputStream(mockCollection2)).thenReturn(javaStream2); + + translator.translate(transform, translationContext); + verify(javaStream1).merge(javaStream2, transformName); + } + + @Test + @SuppressWarnings({"rawtypes", "unchecked"}) + public void testWithDuplicatedCollections() { + String transformName = "transform"; + when(transform.getName()).thenReturn(transformName); + + JavaStream javaStream1 = mock(JavaStream.class); + TranslationContext translationContext = mock(TranslationContext.class); + + PCollection mockCollection1 = mock(PCollection.class); + TaggedPValue mockInput1 = mock(TaggedPValue.class); + when(mockInput1.getValue()).thenReturn(mockCollection1); + + TaggedPValue mockInput2 = mock(TaggedPValue.class); + when(mockInput2.getValue()).thenReturn(mockCollection1); + + when(translationContext.getInputs()).thenReturn(Lists.newArrayList(mockInput1, mockInput2)); + when(translationContext.getInputStream(mockCollection1)).thenReturn(javaStream1); + + translator.translate(transform, translationContext); + verify(javaStream1).map(any(MapFunction.class), eq("dummy")); + verify(javaStream1).merge(any(JavaStream.class), eq(transformName)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f3138dde/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java new file mode 100644 index 0000000..9135022 --- /dev/null +++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.gearpump.translators; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + +import java.time.Instant; +import java.util.Collection; +import java.util.List; + +import org.apache.beam.runners.gearpump.translators.GroupByKeyTranslator.GearpumpWindowFn; +import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Sessions; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.gearpump.streaming.dsl.window.api.WindowFunction; +import org.apache.gearpump.streaming.dsl.window.impl.Window; +import org.joda.time.Duration; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** Tests for {@link GroupByKeyTranslator}. */ +@RunWith(Parameterized.class) +public class GroupByKeyTranslatorTest { + + @Test + @SuppressWarnings({"rawtypes", "unchecked"}) + public void testGearpumpWindowFn() { + GearpumpWindowFn windowFn = new GearpumpWindowFn(true); + List windows = + Lists.newArrayList( + new IntervalWindow(new org.joda.time.Instant(0), new org.joda.time.Instant(10)), + new IntervalWindow(new org.joda.time.Instant(5), new org.joda.time.Instant(15))); + + WindowFunction.Context> context = + new WindowFunction.Context>() { + @Override + public Instant timestamp() { + return Instant.EPOCH; + } + + @Override + public WindowedValue element() { + return WindowedValue.of( + "v1", new org.joda.time.Instant(6), windows, PaneInfo.NO_FIRING); + } + }; + + Window[] result = windowFn.apply(context); + List expected = Lists.newArrayList(); + for (BoundedWindow w : windows) { + expected.add(TranslatorUtils.boundedWindowToGearpumpWindow(w)); + } + assertThat(result, equalTo(expected.toArray())); + } + + @Parameterized.Parameters(name = "{index}: {0}") + public static Iterable> data() { + return ImmutableList.of( + OutputTimeFns.outputAtEarliestInputTimestamp(), + OutputTimeFns.outputAtLatestInputTimestamp(), + OutputTimeFns.outputAtEndOfWindow()); + } + + @Parameterized.Parameter(0) + public OutputTimeFn outputTimeFn; + + @Test + @SuppressWarnings({"rawtypes", "unchecked"}) + public void testKeyedByTimestamp() { + BoundedWindow window = + new IntervalWindow(new org.joda.time.Instant(0), new org.joda.time.Instant(10)); + GroupByKeyTranslator.KeyedByTimestamp keyedByTimestamp = + new GroupByKeyTranslator.KeyedByTimestamp(outputTimeFn); + WindowedValue> value = + WindowedValue.of( + KV.of("key", "val"), org.joda.time.Instant.now(), window, PaneInfo.NO_FIRING); + KV>> result = + keyedByTimestamp.map(value); + org.joda.time.Instant time = + outputTimeFn.assignOutputTime( + value.getTimestamp(), Iterables.getOnlyElement(value.getWindows())); + assertThat(result, equalTo(KV.of(time, value))); + } + + @Test + @SuppressWarnings({"rawtypes", "unchecked"}) + public void testMerge() { + WindowFn slidingWindows = Sessions.withGapDuration(Duration.millis(10)); + GroupByKeyTranslator.Merge merge = new GroupByKeyTranslator.Merge(slidingWindows, outputTimeFn); + org.joda.time.Instant key1 = new org.joda.time.Instant(5); + WindowedValue> value1 = + WindowedValue.of( + KV.of("key1", "value1"), + key1, + new IntervalWindow(new org.joda.time.Instant(5), new org.joda.time.Instant(10)), + PaneInfo.NO_FIRING); + + org.joda.time.Instant key2 = new org.joda.time.Instant(10); + WindowedValue> value2 = + WindowedValue.of( + KV.of("key2", "value2"), + key2, + new IntervalWindow(new org.joda.time.Instant(9), new org.joda.time.Instant(14)), + PaneInfo.NO_FIRING); + + KV>>> result1 = + merge.fold(KV.of(null, null), KV.of(key1, value1)); + assertThat(result1.getKey(), equalTo(key1)); + assertThat(result1.getValue().getValue().getValue(), equalTo(Lists.newArrayList("value1"))); + + KV>>> result2 = + merge.fold(result1, KV.of(key2, value2)); + assertThat(result2.getKey(), equalTo(outputTimeFn.combine(key1, key2))); + Collection resultWindows = result2.getValue().getWindows(); + assertThat(resultWindows.size(), equalTo(1)); + IntervalWindow expectedWindow = + new IntervalWindow(new org.joda.time.Instant(5), new org.joda.time.Instant(14)); + assertThat(resultWindows.toArray()[0], equalTo(expectedWindow)); + assertThat( + result2.getValue().getValue().getValue(), equalTo(Lists.newArrayList("value1", "value2"))); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f3138dde/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslatorTest.java new file mode 100644 index 0000000..20ee1a2 --- /dev/null +++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslatorTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.gearpump.translators; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.beam.runners.gearpump.GearpumpPipelineOptions; +import org.apache.beam.runners.gearpump.translators.io.BoundedSourceWrapper; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.values.PValue; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; +import org.apache.gearpump.streaming.source.DataSource; +import org.junit.Test; +import org.mockito.ArgumentMatcher; + +/** Tests for {@link ReadBoundedTranslator}. */ +public class ReadBoundedTranslatorTest { + + class BoundedSourceWrapperMatcher extends ArgumentMatcher { + @Override + public boolean matches(Object o) { + return o instanceof BoundedSourceWrapper; + } + } + + @Test + @SuppressWarnings({"rawtypes", "unchecked"}) + public void testTranslate() { + ReadBoundedTranslator translator = new ReadBoundedTranslator(); + GearpumpPipelineOptions options = + PipelineOptionsFactory.create().as(GearpumpPipelineOptions.class); + Read.Bounded transform = mock(Read.Bounded.class); + BoundedSource source = mock(BoundedSource.class); + when(transform.getSource()).thenReturn(source); + + TranslationContext translationContext = mock(TranslationContext.class); + when(translationContext.getPipelineOptions()).thenReturn(options); + + JavaStream stream = mock(JavaStream.class); + PValue mockOutput = mock(PValue.class); + when(translationContext.getOutput()).thenReturn(mockOutput); + when(translationContext.getSourceStream(any(DataSource.class))).thenReturn(stream); + + translator.translate(transform, translationContext); + verify(translationContext).getSourceStream(argThat(new BoundedSourceWrapperMatcher())); + verify(translationContext).setOutputStream(mockOutput, stream); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f3138dde/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslatorTest.java new file mode 100644 index 0000000..f27b568 --- /dev/null +++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslatorTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.gearpump.translators; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.beam.runners.gearpump.GearpumpPipelineOptions; +import org.apache.beam.runners.gearpump.translators.io.UnboundedSourceWrapper; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.values.PValue; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; +import org.apache.gearpump.streaming.source.DataSource; +import org.junit.Test; +import org.mockito.ArgumentMatcher; + +/** Tests for {@link ReadUnboundedTranslator}. */ +public class ReadUnboundedTranslatorTest { + + class UnboundedSourceWrapperMatcher extends ArgumentMatcher { + @Override + public boolean matches(Object o) { + return o instanceof UnboundedSourceWrapper; + } + } + + @Test + @SuppressWarnings({"rawtypes", "unchecked"}) + public void testTranslate() { + ReadUnboundedTranslator translator = new ReadUnboundedTranslator(); + GearpumpPipelineOptions options = + PipelineOptionsFactory.create().as(GearpumpPipelineOptions.class); + Read.Unbounded transform = mock(Read.Unbounded.class); + UnboundedSource source = mock(UnboundedSource.class); + when(transform.getSource()).thenReturn(source); + + TranslationContext translationContext = mock(TranslationContext.class); + when(translationContext.getPipelineOptions()).thenReturn(options); + + JavaStream stream = mock(JavaStream.class); + PValue mockOutput = mock(PValue.class); + when(translationContext.getOutput()).thenReturn(mockOutput); + when(translationContext.getSourceStream(any(DataSource.class))).thenReturn(stream); + + translator.translate(transform, translationContext); + verify(translationContext).getSourceStream(argThat(new UnboundedSourceWrapperMatcher())); + verify(translationContext).setOutputStream(mockOutput, stream); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f3138dde/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslatorTest.java new file mode 100644 index 0000000..06ccaaf --- /dev/null +++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslatorTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.gearpump.translators; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.Iterator; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Sessions; +import org.apache.beam.sdk.transforms.windowing.SlidingWindows; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; + +/** Tests for {@link WindowAssignTranslator}. */ +public class WindowAssignTranslatorTest { + + @Test + @SuppressWarnings({"rawtypes", "unchecked"}) + public void testAssignWindowsWithSlidingWindow() { + WindowFn slidingWindows = SlidingWindows.of(Duration.millis(10)).every(Duration.millis(5)); + WindowAssignTranslator.AssignWindows assignWindows = + new WindowAssignTranslator.AssignWindows(slidingWindows); + + String value = "v1"; + Instant timestamp = new Instant(1); + WindowedValue windowedValue = + WindowedValue.timestampedValueInGlobalWindow(value, timestamp); + ArrayList> expected = new ArrayList<>(); + expected.add( + WindowedValue.of( + value, + timestamp, + new IntervalWindow(new Instant(0), new Instant(10)), + PaneInfo.NO_FIRING)); + expected.add( + WindowedValue.of( + value, + timestamp, + new IntervalWindow(new Instant(-5), new Instant(5)), + PaneInfo.NO_FIRING)); + + Iterator> result = assignWindows.flatMap(windowedValue); + assertThat(expected, equalTo(Lists.newArrayList(result))); + } + + @Test + @SuppressWarnings({"rawtypes", "unchecked"}) + public void testAssignWindowsWithSessions() { + WindowFn slidingWindows = Sessions.withGapDuration(Duration.millis(10)); + WindowAssignTranslator.AssignWindows assignWindows = + new WindowAssignTranslator.AssignWindows(slidingWindows); + + String value = "v1"; + Instant timestamp = new Instant(1); + WindowedValue windowedValue = + WindowedValue.timestampedValueInGlobalWindow(value, timestamp); + ArrayList> expected = new ArrayList<>(); + expected.add( + WindowedValue.of( + value, + timestamp, + new IntervalWindow(new Instant(1), new Instant(11)), + PaneInfo.NO_FIRING)); + + Iterator> result = assignWindows.flatMap(windowedValue); + assertThat(expected, equalTo(Lists.newArrayList(result))); + } + + @Test + @SuppressWarnings({"rawtypes", "unchecked"}) + public void testAssignWindowsGlobal() { + WindowFn slidingWindows = new GlobalWindows(); + WindowAssignTranslator.AssignWindows assignWindows = + new WindowAssignTranslator.AssignWindows(slidingWindows); + + String value = "v1"; + Instant timestamp = new Instant(1); + WindowedValue windowedValue = + WindowedValue.timestampedValueInGlobalWindow(value, timestamp); + ArrayList> expected = new ArrayList<>(); + expected.add(WindowedValue.timestampedValueInGlobalWindow(value, timestamp)); + + Iterator> result = assignWindows.flatMap(windowedValue); + assertThat(expected, equalTo(Lists.newArrayList(result))); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f3138dde/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java index af5a1d2..b244484 100644 --- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java +++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java @@ -37,16 +37,14 @@ import org.apache.gearpump.streaming.source.Watermark; import org.junit.Assert; import org.junit.Test; -/** - * Tests for {@link GearpumpSource}. - */ +/** Tests for {@link GearpumpSource}. */ public class GearpumpSourceTest { - private static final List> TEST_VALUES = Lists.newArrayList( - TimestampedValue.of("a", new org.joda.time.Instant(Long.MIN_VALUE)), - TimestampedValue.of("b", new org.joda.time.Instant(0)), - TimestampedValue.of("c", new org.joda.time.Instant(53)), - TimestampedValue.of("d", new org.joda.time.Instant(Long.MAX_VALUE - 1)) - ); + private static final List> TEST_VALUES = + Lists.newArrayList( + TimestampedValue.of("a", new org.joda.time.Instant(Long.MIN_VALUE)), + TimestampedValue.of("b", new org.joda.time.Instant(0)), + TimestampedValue.of("c", new org.joda.time.Instant(53)), + TimestampedValue.of("d", new org.joda.time.Instant(Long.MAX_VALUE - 1))); private static class SourceForTest extends GearpumpSource { private ValuesSource valuesSource; @@ -64,22 +62,24 @@ public class GearpumpSourceTest { @Test public void testGearpumpSource() { - GearpumpPipelineOptions options = PipelineOptionsFactory.create() - .as(GearpumpPipelineOptions.class); - ValuesSource> valuesSource = new ValuesSource<>(TEST_VALUES, - TimestampedValue.TimestampedValueCoder.of(StringUtf8Coder.of())); + GearpumpPipelineOptions options = + PipelineOptionsFactory.create().as(GearpumpPipelineOptions.class); + ValuesSource> valuesSource = + new ValuesSource<>( + TEST_VALUES, TimestampedValue.TimestampedValueCoder.of(StringUtf8Coder.of())); SourceForTest> sourceForTest = - new SourceForTest<>(options, valuesSource); + new SourceForTest<>(options, valuesSource); sourceForTest.open(null, Instant.EPOCH); - for (TimestampedValue value: TEST_VALUES) { + for (TimestampedValue value : TEST_VALUES) { // Check the watermark first since the Source will advance when it's opened Instant expectedWaterMark = TranslatorUtils.jodaTimeToJava8Time(value.getTimestamp()); Assert.assertEquals(expectedWaterMark, sourceForTest.getWatermark()); - Message expectedMsg = Message.apply( - WindowedValue.timestampedValueInGlobalWindow(value, value.getTimestamp()), - value.getTimestamp().getMillis()); + Message expectedMsg = + Message.apply( + WindowedValue.timestampedValueInGlobalWindow(value, value.getTimestamp()), + value.getTimestamp().getMillis()); Message message = sourceForTest.read(); Assert.assertEquals(expectedMsg, message); } http://git-wip-us.apache.org/repos/asf/beam/blob/f3138dde/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/ValueSoureTest.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/ValueSoureTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/ValueSoureTest.java index 8c50703..439e1b1 100644 --- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/ValueSoureTest.java +++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/ValueSoureTest.java @@ -41,18 +41,16 @@ import org.apache.gearpump.util.Constants; import org.junit.Assert; import org.junit.Test; -/** - * Tests for {@link ValuesSource}. - */ +/** Tests for {@link ValuesSource}. */ public class ValueSoureTest { @Test public void testValueSource() { - GearpumpPipelineOptions options = PipelineOptionsFactory.create() - .as(GearpumpPipelineOptions.class); + GearpumpPipelineOptions options = + PipelineOptionsFactory.create().as(GearpumpPipelineOptions.class); Config config = ClusterConfig.master(null); - config = config.withValue(Constants.APPLICATION_TOTAL_RETRIES(), - ConfigValueFactory.fromAnyRef(0)); + config = + config.withValue(Constants.APPLICATION_TOTAL_RETRIES(), ConfigValueFactory.fromAnyRef(0)); EmbeddedCluster cluster = new EmbeddedCluster(config); cluster.start(); @@ -62,8 +60,7 @@ public class ValueSoureTest { Pipeline p = Pipeline.create(options); List values = Lists.newArrayList("1", "2", "3", "4", "5"); ValuesSource source = new ValuesSource<>(values, StringUtf8Coder.of()); - p.apply(Read.from(source)) - .apply(ParDo.of(new ResultCollector())); + p.apply(Read.from(source)).apply(ParDo.of(new ResultCollector())); p.run().waitUntilFinish(); cluster.stop();