Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D5008200C55 for ; Thu, 30 Mar 2017 06:09:09 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id D3918160B95; Thu, 30 Mar 2017 04:09:09 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A63FF160B8A for ; Thu, 30 Mar 2017 06:09:08 +0200 (CEST) Received: (qmail 82901 invoked by uid 500); 30 Mar 2017 04:09:07 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 82888 invoked by uid 99); 30 Mar 2017 04:09:07 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 30 Mar 2017 04:09:07 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id AB852DFBDA; Thu, 30 Mar 2017 04:09:07 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kenn@apache.org To: commits@beam.apache.org Date: Thu, 30 Mar 2017 04:09:07 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] beam git commit: [BEAM-972] Add unit tests to Gearpump runner archived-at: Thu, 30 Mar 2017 04:09:10 -0000 Repository: beam Updated Branches: refs/heads/gearpump-runner 555842a1a -> f4f233304 [BEAM-972] Add unit tests to Gearpump runner Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/eb0d333d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/eb0d333d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/eb0d333d Branch: refs/heads/gearpump-runner Commit: eb0d333df23624f54aae2abb8d7c7873f8ed2a7a Parents: 555842a Author: huafengw Authored: Tue Mar 21 19:45:10 2017 +0800 Committer: huafengw Committed: Thu Mar 23 19:52:11 2017 +0800 ---------------------------------------------------------------------- examples/java/pom.xml | 12 +++ pom.xml | 6 ++ runners/gearpump/README.md | 41 ++++++++- runners/gearpump/pom.xml | 2 - .../gearpump/GearpumpRunnerRegistrar.java | 4 +- .../translators/WindowAssignTranslator.java | 2 +- .../gearpump/translators/io/ValuesSource.java | 2 - .../gearpump/GearpumpRunnerRegistrarTest.java | 55 ++++++++++++ .../runners/gearpump/PipelineOptionsTest.java | 73 ++++++++++++++++ .../translators/io/GearpumpSourceTest.java | 90 ++++++++++++++++++++ .../gearpump/translators/io/ValueSoureTest.java | 82 ++++++++++++++++++ 11 files changed, 362 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/eb0d333d/examples/java/pom.xml ---------------------------------------------------------------------- diff --git a/examples/java/pom.xml b/examples/java/pom.xml index ed4a1d4..0a6d8fe 100644 --- a/examples/java/pom.xml +++ b/examples/java/pom.xml @@ -87,6 +87,18 @@ + + + gearpump-runner + + + org.apache.beam + beam-runners-gearpump + runtime + + + + flink-runner http://git-wip-us.apache.org/repos/asf/beam/blob/eb0d333d/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index c3b8476..2cdb09d 100644 --- a/pom.xml +++ b/pom.xml @@ -475,6 +475,12 @@ org.apache.beam + beam-runners-gearpump + ${project.version} + + + + org.apache.beam beam-examples-java ${project.version} http://git-wip-us.apache.org/repos/asf/beam/blob/eb0d333d/runners/gearpump/README.md ---------------------------------------------------------------------- diff --git a/runners/gearpump/README.md b/runners/gearpump/README.md index ad043fa..e8ce794 100644 --- a/runners/gearpump/README.md +++ b/runners/gearpump/README.md @@ -19,4 +19,43 @@ ## Gearpump Beam Runner -The Gearpump Beam runner allows users to execute pipelines written using the Apache Beam programming API with Apache Gearpump (incubating) as an execution engine. \ No newline at end of file +The Gearpump Beam runner allows users to execute pipelines written using the Apache Beam programming API with Apache Gearpump (incubating) as an execution engine. + +##Getting Started + +The following shows how to run the WordCount example that is provided with the source code on Beam. + +###Installing Beam + +To get the latest version of Beam with Gearpump-Runner, first clone the Beam repository: + +``` +git clone https://github.com/apache/beam +git checkout gearpump-runner +``` + +Then switch to the newly created directory and run Maven to build the Apache Beam: + +``` +cd beam +mvn clean install -DskipTests +``` + +Now Apache Beam and the Gearpump Runner are installed in your local Maven repository. + +###Running Wordcount Example + +Download something to count: + +``` +curl http://www.gutenberg.org/cache/epub/1128/pg1128.txt > /tmp/kinglear.txt +``` + +Run the pipeline, using the Gearpump runner: + +``` +cd examples/java +mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount -Dexec.args="--inputFile=/tmp/kinglear.txt --output=/tmp/wordcounts.txt --runner=TestGearpumpRunner" -Pgearpump-runner +``` + +Once completed, check the output file /tmp/wordcounts.txt-00000-of-00001 http://git-wip-us.apache.org/repos/asf/beam/blob/eb0d333d/runners/gearpump/pom.xml ---------------------------------------------------------------------- diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml index 9a6a432..a691801 100644 --- a/runners/gearpump/pom.xml +++ b/runners/gearpump/pom.xml @@ -99,13 +99,11 @@ org.apache.gearpump gearpump-streaming_2.11 ${gearpump.version} - provided org.apache.gearpump gearpump-core_2.11 ${gearpump.version} - provided com.typesafe http://git-wip-us.apache.org/repos/asf/beam/blob/eb0d333d/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunnerRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunnerRegistrar.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunnerRegistrar.java index b77e1e3..3183d45 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunnerRegistrar.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunnerRegistrar.java @@ -44,7 +44,9 @@ public class GearpumpRunnerRegistrar { @Override public Iterable>> getPipelineRunners() { - return ImmutableList.>>of(TestGearpumpRunner.class); + return ImmutableList.>>of( + GearpumpRunner.class, + TestGearpumpRunner.class); } } http://git-wip-us.apache.org/repos/asf/beam/blob/eb0d333d/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java index fe6015a..29d8f02 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java @@ -39,7 +39,7 @@ import org.joda.time.Instant; * {@link Window.Bound} is translated to Gearpump flatMap function. */ @SuppressWarnings("unchecked") -public class WindowAssignTranslator implements TransformTranslator> { +public class WindowAssignTranslator implements TransformTranslator> { private static final long serialVersionUID = -964887482120489061L; http://git-wip-us.apache.org/repos/asf/beam/blob/eb0d333d/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java index e0488cd..ccd5cdf 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java @@ -110,8 +110,6 @@ public class ValuesSource extends UnboundedSource serializers = Maps.newHashMap(); + serializers.put("classA", "SerializerA"); + GearpumpPipelineOptions options = PipelineOptionsFactory.create() + .as(GearpumpPipelineOptions.class); + Config config = ClusterConfig.master(null); + EmbeddedCluster cluster = new EmbeddedCluster(config); + options.setSerializers(serializers); + options.setApplicationName(appName); + options.setEmbeddedCluster(cluster); + options.setParallelism(10); + + byte[] serializedOptions = serialize(options); + GearpumpPipelineOptions deserializedOptions = new ObjectMapper() + .readValue(serializedOptions, PipelineOptions.class).as(GearpumpPipelineOptions.class); + + assertNull(deserializedOptions.getEmbeddedCluster()); + assertNull(deserializedOptions.getSerializers()); + assertEquals(10, deserializedOptions.getParallelism()); + assertEquals(appName, deserializedOptions.getApplicationName()); + } + + private byte[] serialize(Object obj) { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + new ObjectMapper().writeValue(baos, obj); + return baos.toByteArray(); + } catch (Exception e) { + throw new RuntimeException("Couldn't serialize PipelineOptions.", e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/eb0d333d/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java new file mode 100644 index 0000000..af5a1d2 --- /dev/null +++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.gearpump.translators.io; + +import com.google.common.collect.Lists; + +import java.io.IOException; +import java.time.Instant; +import java.util.List; + +import org.apache.beam.runners.gearpump.GearpumpPipelineOptions; +import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.Source; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.gearpump.Message; +import org.apache.gearpump.streaming.source.Watermark; +import org.junit.Assert; +import org.junit.Test; + +/** + * Tests for {@link GearpumpSource}. + */ +public class GearpumpSourceTest { + private static final List> TEST_VALUES = Lists.newArrayList( + TimestampedValue.of("a", new org.joda.time.Instant(Long.MIN_VALUE)), + TimestampedValue.of("b", new org.joda.time.Instant(0)), + TimestampedValue.of("c", new org.joda.time.Instant(53)), + TimestampedValue.of("d", new org.joda.time.Instant(Long.MAX_VALUE - 1)) + ); + + private static class SourceForTest extends GearpumpSource { + private ValuesSource valuesSource; + + SourceForTest(PipelineOptions options, ValuesSource valuesSource) { + super(options); + this.valuesSource = valuesSource; + } + + @Override + protected Source.Reader createReader(PipelineOptions options) throws IOException { + return this.valuesSource.createReader(options, null); + } + } + + @Test + public void testGearpumpSource() { + GearpumpPipelineOptions options = PipelineOptionsFactory.create() + .as(GearpumpPipelineOptions.class); + ValuesSource> valuesSource = new ValuesSource<>(TEST_VALUES, + TimestampedValue.TimestampedValueCoder.of(StringUtf8Coder.of())); + SourceForTest> sourceForTest = + new SourceForTest<>(options, valuesSource); + sourceForTest.open(null, Instant.EPOCH); + + for (TimestampedValue value: TEST_VALUES) { + // Check the watermark first since the Source will advance when it's opened + Instant expectedWaterMark = TranslatorUtils.jodaTimeToJava8Time(value.getTimestamp()); + Assert.assertEquals(expectedWaterMark, sourceForTest.getWatermark()); + + Message expectedMsg = Message.apply( + WindowedValue.timestampedValueInGlobalWindow(value, value.getTimestamp()), + value.getTimestamp().getMillis()); + Message message = sourceForTest.read(); + Assert.assertEquals(expectedMsg, message); + } + + Assert.assertNull(sourceForTest.read()); + Assert.assertEquals(Watermark.MAX(), sourceForTest.getWatermark()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/eb0d333d/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/ValueSoureTest.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/ValueSoureTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/ValueSoureTest.java new file mode 100644 index 0000000..8c50703 --- /dev/null +++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/ValueSoureTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.gearpump.translators.io; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigValueFactory; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.beam.runners.gearpump.GearpumpPipelineOptions; +import org.apache.beam.runners.gearpump.GearpumpRunner; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.gearpump.cluster.ClusterConfig; +import org.apache.gearpump.cluster.embedded.EmbeddedCluster; +import org.apache.gearpump.util.Constants; +import org.junit.Assert; +import org.junit.Test; + +/** + * Tests for {@link ValuesSource}. + */ +public class ValueSoureTest { + + @Test + public void testValueSource() { + GearpumpPipelineOptions options = PipelineOptionsFactory.create() + .as(GearpumpPipelineOptions.class); + Config config = ClusterConfig.master(null); + config = config.withValue(Constants.APPLICATION_TOTAL_RETRIES(), + ConfigValueFactory.fromAnyRef(0)); + EmbeddedCluster cluster = new EmbeddedCluster(config); + cluster.start(); + + options.setEmbeddedCluster(cluster); + options.setRunner(GearpumpRunner.class); + options.setParallelism(1); + Pipeline p = Pipeline.create(options); + List values = Lists.newArrayList("1", "2", "3", "4", "5"); + ValuesSource source = new ValuesSource<>(values, StringUtf8Coder.of()); + p.apply(Read.from(source)) + .apply(ParDo.of(new ResultCollector())); + + p.run().waitUntilFinish(); + cluster.stop(); + + Assert.assertEquals(Sets.newHashSet(values), ResultCollector.RESULTS); + } + + private static class ResultCollector extends DoFn { + private static final Set RESULTS = Collections.synchronizedSet(new HashSet<>()); + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + RESULTS.add(c.element()); + } + } +}