Return-Path: X-Original-To: apmail-beam-commits-archive@minotaur.apache.org Delivered-To: apmail-beam-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5B29819619 for ; Thu, 24 Mar 2016 02:48:23 +0000 (UTC) Received: (qmail 9278 invoked by uid 500); 24 Mar 2016 02:48:23 -0000 Delivered-To: apmail-beam-commits-archive@beam.apache.org Received: (qmail 9224 invoked by uid 500); 24 Mar 2016 02:48:22 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 9213 invoked by uid 99); 24 Mar 2016 02:48:22 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 24 Mar 2016 02:48:22 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 3867D1A49DD for ; Thu, 24 Mar 2016 02:48:22 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.221 X-Spam-Level: X-Spam-Status: No, score=-3.221 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id jzyRBDhmaJs7 for ; Thu, 24 Mar 2016 02:47:59 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with SMTP id 022D360E93 for ; Thu, 24 Mar 2016 02:47:28 +0000 (UTC) Received: (qmail 3961 invoked by uid 99); 24 Mar 2016 02:47:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 24 Mar 2016 02:47:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 50AE7E97E6; Thu, 24 Mar 2016 02:47:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dhalperi@apache.org To: commits@beam.incubator.apache.org Date: Thu, 24 Mar 2016 02:48:20 -0000 Message-Id: <06cf0f459a4e428cb55ac445762801b4@git.apache.org> In-Reply-To: <0c1667d8252646c1acedf8c54c1ba4ca@git.apache.org> References: <0c1667d8252646c1acedf8c54c1ba4ca@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [56/67] incubator-beam git commit: Directory reorganization http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/MaxPerKeyExamplesTest.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/MaxPerKeyExamplesTest.java b/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/MaxPerKeyExamplesTest.java new file mode 100644 index 0000000..3deff2a --- /dev/null +++ b/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/MaxPerKeyExamplesTest.java @@ -0,0 +1,85 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.dataflow.examples.cookbook; + +import com.google.api.services.bigquery.model.TableRow; +import com.google.cloud.dataflow.examples.cookbook.MaxPerKeyExamples.ExtractTempFn; +import com.google.cloud.dataflow.examples.cookbook.MaxPerKeyExamples.FormatMaxesFn; +import com.google.cloud.dataflow.sdk.transforms.DoFnTester; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.common.collect.ImmutableList; + +import org.hamcrest.CoreMatchers; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.List; + +/** Unit tests for {@link MaxPerKeyExamples}. */ +@RunWith(JUnit4.class) +public class MaxPerKeyExamplesTest { + + private static final TableRow row1 = new TableRow() + .set("month", "6").set("day", "21") + .set("year", "2014").set("mean_temp", "85.3") + .set("tornado", true); + private static final TableRow row2 = new TableRow() + .set("month", "7").set("day", "20") + .set("year", "2014").set("mean_temp", "75.4") + .set("tornado", false); + private static final TableRow row3 = new TableRow() + .set("month", "6").set("day", "18") + .set("year", "2014").set("mean_temp", "45.3") + .set("tornado", true); + private static final List TEST_ROWS = ImmutableList.of(row1, row2, row3); + + private static final KV kv1 = KV.of(6, 85.3); + private static final KV kv2 = KV.of(6, 45.3); + private static final KV kv3 = KV.of(7, 75.4); + + private static final List> TEST_KVS = ImmutableList.of(kv1, kv2, kv3); + + private static final TableRow resultRow1 = new TableRow() + .set("month", 6) + .set("max_mean_temp", 85.3); + private static final TableRow resultRow2 = new TableRow() + .set("month", 7) + .set("max_mean_temp", 75.4); + + + @Test + public void testExtractTempFn() { + DoFnTester> extractTempFn = + DoFnTester.of(new ExtractTempFn()); + List> results = extractTempFn.processBatch(TEST_ROWS); + Assert.assertThat(results, CoreMatchers.hasItem(kv1)); + Assert.assertThat(results, CoreMatchers.hasItem(kv2)); + Assert.assertThat(results, CoreMatchers.hasItem(kv3)); + } + + @Test + public void testFormatMaxesFn() { + DoFnTester, TableRow> formatMaxesFnFn = + DoFnTester.of(new FormatMaxesFn()); + List results = formatMaxesFnFn.processBatch(TEST_KVS); + Assert.assertThat(results, CoreMatchers.hasItem(resultRow1)); + Assert.assertThat(results, CoreMatchers.hasItem(resultRow2)); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/TriggerExampleTest.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/TriggerExampleTest.java b/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/TriggerExampleTest.java new file mode 100644 index 0000000..209ea52 --- /dev/null +++ b/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/TriggerExampleTest.java @@ -0,0 +1,139 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.dataflow.examples.cookbook; + +import com.google.api.services.bigquery.model.TableRow; +import com.google.cloud.dataflow.examples.cookbook.TriggerExample.ExtractFlowInfo; +import com.google.cloud.dataflow.examples.cookbook.TriggerExample.TotalFlow; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.testing.RunnableOnService; +import com.google.cloud.dataflow.sdk.testing.TestPipeline; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.DoFnTester; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.Window; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.TimestampedValue; + +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.Arrays; +import java.util.List; + +/** + * Unit Tests for {@link TriggerExample}. + * The results generated by triggers are by definition non-deterministic and hence hard to test. + * The unit test does not test all aspects of the example. + */ +@RunWith(JUnit4.class) +public class TriggerExampleTest { + + private static final String[] INPUT = + {"01/01/2010 00:00:00,1108302,94,E,ML,36,100,29,0.0065,66,9,1,0.001,74.8,1,9,3,0.0028,71,1,9," + + "12,0.0099,67.4,1,9,13,0.0121,99.0,1,,,,,0,,,,,0,,,,,0,,,,,0", "01/01/2010 00:00:00," + + "1100333,5,N,FR,9,0,39,,,9,,,,0,,,,,0,,,,,0,,,,,0,,,,,0,,,,,0,,,,,0,,,,"}; + + private static final List> TIME_STAMPED_INPUT = Arrays.asList( + TimestampedValue.of("01/01/2010 00:00:00,1108302,5,W,ML,36,100,30,0.0065,66,9,1,0.001," + + "74.8,1,9,3,0.0028,71,1,9,12,0.0099,87.4,1,9,13,0.0121,99.0,1,,,,,0,,,,,0,,,,,0,,," + + ",,0", new Instant(60000)), + TimestampedValue.of("01/01/2010 00:00:00,1108302,110,E,ML,36,100,40,0.0065,66,9,1,0.001," + + "74.8,1,9,3,0.0028,71,1,9,12,0.0099,67.4,1,9,13,0.0121,99.0,1,,,,,0,,,,,0,,,,,0,,," + + ",,0", new Instant(1)), + TimestampedValue.of("01/01/2010 00:00:00,1108302,110,E,ML,36,100,50,0.0065,66,9,1," + + "0.001,74.8,1,9,3,0.0028,71,1,9,12,0.0099,97.4,1,9,13,0.0121,50.0,1,,,,,0,,,,,0" + + ",,,,,0,,,,,0", new Instant(1))); + + private static final TableRow OUT_ROW_1 = new TableRow() + .set("trigger_type", "default") + .set("freeway", "5").set("total_flow", 30) + .set("number_of_records", 1) + .set("isFirst", true).set("isLast", true) + .set("timing", "ON_TIME") + .set("window", "[1970-01-01T00:01:00.000Z..1970-01-01T00:02:00.000Z)"); + + private static final TableRow OUT_ROW_2 = new TableRow() + .set("trigger_type", "default") + .set("freeway", "110").set("total_flow", 90) + .set("number_of_records", 2) + .set("isFirst", true).set("isLast", true) + .set("timing", "ON_TIME") + .set("window", "[1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z)"); + + @Test + public void testExtractTotalFlow() { + DoFnTester> extractFlowInfow = DoFnTester + .of(new ExtractFlowInfo()); + + List> results = extractFlowInfow.processBatch(INPUT); + Assert.assertEquals(results.size(), 1); + Assert.assertEquals(results.get(0).getKey(), "94"); + Assert.assertEquals(results.get(0).getValue(), new Integer(29)); + + List> output = extractFlowInfow.processBatch(""); + Assert.assertEquals(output.size(), 0); + } + + @Test + @Category(RunnableOnService.class) + public void testTotalFlow () { + Pipeline pipeline = TestPipeline.create(); + PCollection> flow = pipeline + .apply(Create.timestamped(TIME_STAMPED_INPUT)) + .apply(ParDo.of(new ExtractFlowInfo())); + + PCollection totalFlow = flow + .apply(Window.>into(FixedWindows.of(Duration.standardMinutes(1)))) + .apply(new TotalFlow("default")); + + PCollection results = totalFlow.apply(ParDo.of(new FormatResults())); + + + DataflowAssert.that(results).containsInAnyOrder(OUT_ROW_1, OUT_ROW_2); + pipeline.run(); + + } + + static class FormatResults extends DoFn { + @Override + public void processElement(ProcessContext c) throws Exception { + TableRow element = c.element(); + TableRow row = new TableRow() + .set("trigger_type", element.get("trigger_type")) + .set("freeway", element.get("freeway")) + .set("total_flow", element.get("total_flow")) + .set("number_of_records", element.get("number_of_records")) + .set("isFirst", element.get("isFirst")) + .set("isLast", element.get("isLast")) + .set("timing", element.get("timing")) + .set("window", element.get("window")); + c.output(row); + } + } +} + + http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/pom.xml ---------------------------------------------------------------------- diff --git a/examples/pom.xml b/examples/pom.xml deleted file mode 100644 index 8b17dfe..0000000 --- a/examples/pom.xml +++ /dev/null @@ -1,394 +0,0 @@ - - - - - 4.0.0 - - - org.apache.beam - parent - 0.1.0-incubating-SNAPSHOT - ../pom.xml - - - java-examples-all - Apache Beam :: Examples :: Java All - Apache Beam SDK provides a simple, Java-based - interface for processing virtually any size data. This - artifact includes all Apache Beam Java SDK examples. - - jar - - - - DataflowPipelineTests - - true - com.google.cloud.dataflow.sdk.testing.RunnableOnService - both - - - - - - - - maven-compiler-plugin - - - - org.apache.maven.plugins - maven-dependency-plugin - - - - org.apache.maven.plugins - maven-checkstyle-plugin - 2.12 - - - com.puppycrawl.tools - checkstyle - 6.6 - - - - ../checkstyle.xml - true - true - true - false - - - - - check - - - - - - - - org.apache.maven.plugins - maven-source-plugin - 2.4 - - - attach-sources - compile - - jar - - - - attach-test-sources - test-compile - - test-jar - - - - - - - org.apache.maven.plugins - maven-javadoc-plugin - - Apache Beam Examples - Apache Beam Examples - - com.google.cloud.dataflow.examples - -exclude com.google.cloud.dataflow.sdk.runners.worker:com.google.cloud.dataflow.sdk.runners.dataflow:com.google.cloud.dataflow.sdk.util ${dataflow.javadoc_opts} - false - true - ]]> - - - - - https://cloud.google.com/dataflow/java-sdk/JavaDoc/ - ${basedir}/../javadoc/dataflow-sdk-docs - - - - https://developers.google.com/api-client-library/java/google-api-java-client/reference/1.20.0/ - ${basedir}/../javadoc/apiclient-docs - - - http://avro.apache.org/docs/1.7.7/api/java/ - ${basedir}/../javadoc/avro-docs - - - https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/java/latest/ - ${basedir}/../javadoc/bq-docs - - - https://cloud.google.com/datastore/docs/apis/javadoc/ - ${basedir}/../javadoc/datastore-docs - - - http://docs.guava-libraries.googlecode.com/git-history/release18/javadoc/ - ${basedir}/../javadoc/guava-docs - - - http://fasterxml.github.io/jackson-annotations/javadoc/2.7/ - ${basedir}/../javadoc/jackson-annotations-docs - - - http://fasterxml.github.io/jackson-databind/javadoc/2.7/ - ${basedir}/../javadoc/jackson-databind-docs - - - http://www.joda.org/joda-time/apidocs - ${basedir}/../javadoc/joda-docs - - - https://developers.google.com/api-client-library/java/google-oauth-java-client/reference/1.20.0/ - ${basedir}/../javadoc/oauth-docs - - - - - - - jar - - package - - - - - - org.apache.maven.plugins - maven-shade-plugin - 2.4.1 - - - package - - shade - - - ${project.artifactId}-bundled-${project.version} - - - *:* - - - - - *:* - - META-INF/*.SF - META-INF/*.DSA - META-INF/*.RSA - - - - - - - - - - org.apache.maven.plugins - maven-jar-plugin - - - default-jar - - jar - - - - default-test-jar - - test-jar - - - - - - - - org.jacoco - jacoco-maven-plugin - - - - - - - org.apache.beam - java-sdk-all - ${project.version} - - - - com.google.api-client - google-api-client - ${google-clients.version} - - - - com.google.guava - guava-jdk5 - - - - - - com.google.apis - google-api-services-dataflow - ${dataflow.version} - - - - com.google.guava - guava-jdk5 - - - - - - com.google.apis - google-api-services-bigquery - ${bigquery.version} - - - - com.google.guava - guava-jdk5 - - - - - - com.google.http-client - google-http-client - ${google-clients.version} - - - - com.google.guava - guava-jdk5 - - - - - - org.apache.avro - avro - ${avro.version} - - - - com.google.apis - google-api-services-datastore-protobuf - ${datastore.version} - - - - com.google.guava - guava-jdk5 - - - - - - com.google.apis - google-api-services-pubsub - ${pubsub.version} - - - - com.google.guava - guava-jdk5 - - - - - - com.google.guava - guava - ${guava.version} - - - - com.google.code.findbugs - jsr305 - ${jsr305.version} - - - - joda-time - joda-time - ${joda.version} - - - - org.slf4j - slf4j-api - ${slf4j.version} - - - - org.slf4j - slf4j-jdk14 - ${slf4j.version} - runtime - - - - javax.servlet - javax.servlet-api - 3.1.0 - - - - - - org.hamcrest - hamcrest-all - ${hamcrest.version} - - - - junit - junit - ${junit.version} - - - - org.mockito - mockito-all - 1.10.19 - test - - - http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/DebuggingWordCount.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/DebuggingWordCount.java b/examples/src/main/java/com/google/cloud/dataflow/examples/DebuggingWordCount.java deleted file mode 100644 index 8823dbc..0000000 --- a/examples/src/main/java/com/google/cloud/dataflow/examples/DebuggingWordCount.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.examples; - -import com.google.cloud.dataflow.examples.WordCount.WordCountOptions; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.testing.DataflowAssert; -import com.google.cloud.dataflow.sdk.transforms.Aggregator; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.transforms.Sum; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Arrays; -import java.util.List; -import java.util.regex.Pattern; - - -/** - * An example that verifies word counts in Shakespeare and includes Dataflow best practices. - * - *

This class, {@link DebuggingWordCount}, is the third in a series of four successively more - * detailed 'word count' examples. You may first want to take a look at {@link MinimalWordCount} - * and {@link WordCount}. After you've looked at this example, then see the - * {@link WindowedWordCount} pipeline, for introduction of additional concepts. - * - *

Basic concepts, also in the MinimalWordCount and WordCount examples: - * Reading text files; counting a PCollection; executing a Pipeline both locally - * and using the Dataflow service; defining DoFns. - * - *

New Concepts: - *

- *   1. Logging to Cloud Logging
- *   2. Controlling Dataflow worker log levels
- *   3. Creating a custom aggregator
- *   4. Testing your Pipeline via DataflowAssert
- * 
- * - *

To execute this pipeline locally, specify general pipeline configuration: - *

{@code
- *   --project=YOUR_PROJECT_ID
- * }
- * 
- * - *

To execute this pipeline using the Dataflow service and the additional logging discussed - * below, specify pipeline configuration: - *

{@code
- *   --project=YOUR_PROJECT_ID
- *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- *   --runner=BlockingDataflowPipelineRunner
- *   --workerLogLevelOverrides={"com.google.cloud.dataflow.examples":"DEBUG"}
- * }
- * 
- * - *

Note that when you run via mvn exec, you may need to escape - * the quotations as appropriate for your shell. For example, in bash: - *

- * mvn compile exec:java ... \
- *   -Dexec.args="... \
- *     --workerLogLevelOverrides={\\\"com.google.cloud.dataflow.examples\\\":\\\"DEBUG\\\"}"
- * 
- * - *

Concept #2: Dataflow workers which execute user code are configured to log to Cloud - * Logging by default at "INFO" log level and higher. One may override log levels for specific - * logging namespaces by specifying: - *


- *   --workerLogLevelOverrides={"Name1":"Level1","Name2":"Level2",...}
- * 
- * For example, by specifying: - *

- *   --workerLogLevelOverrides={"com.google.cloud.dataflow.examples":"DEBUG"}
- * 
- * when executing this pipeline using the Dataflow service, Cloud Logging would contain only - * "DEBUG" or higher level logs for the {@code com.google.cloud.dataflow.examples} package in - * addition to the default "INFO" or higher level logs. In addition, the default Dataflow worker - * logging configuration can be overridden by specifying - * {@code --defaultWorkerLogLevel=}. For example, - * by specifying {@code --defaultWorkerLogLevel=DEBUG} when executing this pipeline with - * the Dataflow service, Cloud Logging would contain all "DEBUG" or higher level logs. Note - * that changing the default worker log level to TRACE or DEBUG will significantly increase - * the amount of logs output. - * - *

The input file defaults to {@code gs://dataflow-samples/shakespeare/kinglear.txt} and can be - * overridden with {@code --inputFile}. - */ -public class DebuggingWordCount { - /** A DoFn that filters for a specific key based upon a regular expression. */ - public static class FilterTextFn extends DoFn, KV> { - /** - * Concept #1: The logger below uses the fully qualified class name of FilterTextFn - * as the logger. All log statements emitted by this logger will be referenced by this name - * and will be visible in the Cloud Logging UI. Learn more at https://cloud.google.com/logging - * about the Cloud Logging UI. - */ - private static final Logger LOG = LoggerFactory.getLogger(FilterTextFn.class); - - private final Pattern filter; - public FilterTextFn(String pattern) { - filter = Pattern.compile(pattern); - } - - /** - * Concept #3: A custom aggregator can track values in your pipeline as it runs. Those - * values will be displayed in the Dataflow Monitoring UI when this pipeline is run using the - * Dataflow service. These aggregators below track the number of matched and unmatched words. - * Learn more at https://cloud.google.com/dataflow/pipelines/dataflow-monitoring-intf about - * the Dataflow Monitoring UI. - */ - private final Aggregator matchedWords = - createAggregator("matchedWords", new Sum.SumLongFn()); - private final Aggregator unmatchedWords = - createAggregator("umatchedWords", new Sum.SumLongFn()); - - @Override - public void processElement(ProcessContext c) { - if (filter.matcher(c.element().getKey()).matches()) { - // Log at the "DEBUG" level each element that we match. When executing this pipeline - // using the Dataflow service, these log lines will appear in the Cloud Logging UI - // only if the log level is set to "DEBUG" or lower. - LOG.debug("Matched: " + c.element().getKey()); - matchedWords.addValue(1L); - c.output(c.element()); - } else { - // Log at the "TRACE" level each element that is not matched. Different log levels - // can be used to control the verbosity of logging providing an effective mechanism - // to filter less important information. - LOG.trace("Did not match: " + c.element().getKey()); - unmatchedWords.addValue(1L); - } - } - } - - public static void main(String[] args) { - WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() - .as(WordCountOptions.class); - Pipeline p = Pipeline.create(options); - - PCollection> filteredWords = - p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile())) - .apply(new WordCount.CountWords()) - .apply(ParDo.of(new FilterTextFn("Flourish|stomach"))); - - /** - * Concept #4: DataflowAssert is a set of convenient PTransforms in the style of - * Hamcrest's collection matchers that can be used when writing Pipeline level tests - * to validate the contents of PCollections. DataflowAssert is best used in unit tests - * with small data sets but is demonstrated here as a teaching tool. - * - *

Below we verify that the set of filtered words matches our expected counts. Note - * that DataflowAssert does not provide any output and that successful completion of the - * Pipeline implies that the expectations were met. Learn more at - * https://cloud.google.com/dataflow/pipelines/testing-your-pipeline on how to test - * your Pipeline and see {@link DebuggingWordCountTest} for an example unit test. - */ - List> expectedResults = Arrays.asList( - KV.of("Flourish", 3L), - KV.of("stomach", 1L)); - DataflowAssert.that(filteredWords).containsInAnyOrder(expectedResults); - - p.run(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/MinimalWordCount.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/MinimalWordCount.java b/examples/src/main/java/com/google/cloud/dataflow/examples/MinimalWordCount.java deleted file mode 100644 index 4ed0520..0000000 --- a/examples/src/main/java/com/google/cloud/dataflow/examples/MinimalWordCount.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.examples; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner; -import com.google.cloud.dataflow.sdk.transforms.Count; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.MapElements; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.transforms.SimpleFunction; -import com.google.cloud.dataflow.sdk.values.KV; - - -/** - * An example that counts words in Shakespeare. - * - *

This class, {@link MinimalWordCount}, is the first in a series of four successively more - * detailed 'word count' examples. Here, for simplicity, we don't show any error-checking or - * argument processing, and focus on construction of the pipeline, which chains together the - * application of core transforms. - * - *

Next, see the {@link WordCount} pipeline, then the {@link DebuggingWordCount}, and finally - * the {@link WindowedWordCount} pipeline, for more detailed examples that introduce additional - * concepts. - * - *

Concepts: - *

- *   1. Reading data from text files
- *   2. Specifying 'inline' transforms
- *   3. Counting a PCollection
- *   4. Writing data to Cloud Storage as text files
- * 
- * - *

To execute this pipeline, first edit the code to set your project ID, the staging - * location, and the output location. The specified GCS bucket(s) must already exist. - * - *

Then, run the pipeline as described in the README. It will be deployed and run using the - * Dataflow service. No args are required to run the pipeline. You can see the results in your - * output bucket in the GCS browser. - */ -public class MinimalWordCount { - - public static void main(String[] args) { - // Create a DataflowPipelineOptions object. This object lets us set various execution - // options for our pipeline, such as the associated Cloud Platform project and the location - // in Google Cloud Storage to stage files. - DataflowPipelineOptions options = PipelineOptionsFactory.create() - .as(DataflowPipelineOptions.class); - options.setRunner(BlockingDataflowPipelineRunner.class); - // CHANGE 1/3: Your project ID is required in order to run your pipeline on the Google Cloud. - options.setProject("SET_YOUR_PROJECT_ID_HERE"); - // CHANGE 2/3: Your Google Cloud Storage path is required for staging local files. - options.setStagingLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_STAGING_DIRECTORY"); - - // Create the Pipeline object with the options we defined above. - Pipeline p = Pipeline.create(options); - - // Apply the pipeline's transforms. - - // Concept #1: Apply a root transform to the pipeline; in this case, TextIO.Read to read a set - // of input text files. TextIO.Read returns a PCollection where each element is one line from - // the input text (a set of Shakespeare's texts). - p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*")) - // Concept #2: Apply a ParDo transform to our PCollection of text lines. This ParDo invokes a - // DoFn (defined in-line) on each element that tokenizes the text line into individual words. - // The ParDo returns a PCollection, where each element is an individual word in - // Shakespeare's collected texts. - .apply(ParDo.named("ExtractWords").of(new DoFn() { - @Override - public void processElement(ProcessContext c) { - for (String word : c.element().split("[^a-zA-Z']+")) { - if (!word.isEmpty()) { - c.output(word); - } - } - } - })) - // Concept #3: Apply the Count transform to our PCollection of individual words. The Count - // transform returns a new PCollection of key/value pairs, where each key represents a unique - // word in the text. The associated value is the occurrence count for that word. - .apply(Count.perElement()) - // Apply a MapElements transform that formats our PCollection of word counts into a printable - // string, suitable for writing to an output file. - .apply("FormatResults", MapElements.via(new SimpleFunction, String>() { - @Override - public String apply(KV input) { - return input.getKey() + ": " + input.getValue(); - } - })) - // Concept #4: Apply a write transform, TextIO.Write, at the end of the pipeline. - // TextIO.Write writes the contents of a PCollection (in this case, our PCollection of - // formatted strings) to a series of text files in Google Cloud Storage. - // CHANGE 3/3: The Google Cloud Storage path is required for outputting the results to. - .apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX")); - - // Run the pipeline. - p.run(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/WindowedWordCount.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/WindowedWordCount.java b/examples/src/main/java/com/google/cloud/dataflow/examples/WindowedWordCount.java deleted file mode 100644 index 2adac55..0000000 --- a/examples/src/main/java/com/google/cloud/dataflow/examples/WindowedWordCount.java +++ /dev/null @@ -1,269 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.examples; - -import com.google.api.services.bigquery.model.TableFieldSchema; -import com.google.api.services.bigquery.model.TableReference; -import com.google.api.services.bigquery.model.TableRow; -import com.google.api.services.bigquery.model.TableSchema; -import com.google.cloud.dataflow.examples.common.DataflowExampleOptions; -import com.google.cloud.dataflow.examples.common.DataflowExampleUtils; -import com.google.cloud.dataflow.examples.common.ExampleBigQueryTableOptions; -import com.google.cloud.dataflow.examples.common.ExamplePubsubTopicOptions; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.PipelineResult; -import com.google.cloud.dataflow.sdk.io.BigQueryIO; -import com.google.cloud.dataflow.sdk.io.PubsubIO; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.options.Default; -import com.google.cloud.dataflow.sdk.options.Description; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; -import com.google.cloud.dataflow.sdk.transforms.windowing.Window; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; - -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - - -/** - * An example that counts words in text, and can run over either unbounded or bounded input - * collections. - * - *

This class, {@link WindowedWordCount}, is the last in a series of four successively more - * detailed 'word count' examples. First take a look at {@link MinimalWordCount}, - * {@link WordCount}, and {@link DebuggingWordCount}. - * - *

Basic concepts, also in the MinimalWordCount, WordCount, and DebuggingWordCount examples: - * Reading text files; counting a PCollection; writing to GCS; executing a Pipeline both locally - * and using the Dataflow service; defining DoFns; creating a custom aggregator; - * user-defined PTransforms; defining PipelineOptions. - * - *

New Concepts: - *

- *   1. Unbounded and bounded pipeline input modes
- *   2. Adding timestamps to data
- *   3. PubSub topics as sources
- *   4. Windowing
- *   5. Re-using PTransforms over windowed PCollections
- *   6. Writing to BigQuery
- * 
- * - *

To execute this pipeline locally, specify general pipeline configuration: - *

{@code
- *   --project=YOUR_PROJECT_ID
- * }
- * 
- * - *

To execute this pipeline using the Dataflow service, specify pipeline configuration: - *

{@code
- *   --project=YOUR_PROJECT_ID
- *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- *   --runner=BlockingDataflowPipelineRunner
- * }
- * 
- * - *

Optionally specify the input file path via: - * {@code --inputFile=gs://INPUT_PATH}, - * which defaults to {@code gs://dataflow-samples/shakespeare/kinglear.txt}. - * - *

Specify an output BigQuery dataset and optionally, a table for the output. If you don't - * specify the table, one will be created for you using the job name. If you don't specify the - * dataset, a dataset called {@code dataflow-examples} must already exist in your project. - * {@code --bigQueryDataset=YOUR-DATASET --bigQueryTable=YOUR-NEW-TABLE-NAME}. - * - *

Decide whether you want your pipeline to run with 'bounded' (such as files in GCS) or - * 'unbounded' input (such as a PubSub topic). To run with unbounded input, set - * {@code --unbounded=true}. Then, optionally specify the Google Cloud PubSub topic to read from - * via {@code --pubsubTopic=projects/PROJECT_ID/topics/YOUR_TOPIC_NAME}. If the topic does not - * exist, the pipeline will create one for you. It will delete this topic when it terminates. - * The pipeline will automatically launch an auxiliary batch pipeline to populate the given PubSub - * topic with the contents of the {@code --inputFile}, in order to make the example easy to run. - * If you want to use an independently-populated PubSub topic, indicate this by setting - * {@code --inputFile=""}. In that case, the auxiliary pipeline will not be started. - * - *

By default, the pipeline will do fixed windowing, on 1-minute windows. You can - * change this interval by setting the {@code --windowSize} parameter, e.g. {@code --windowSize=10} - * for 10-minute windows. - */ -public class WindowedWordCount { - private static final Logger LOG = LoggerFactory.getLogger(WindowedWordCount.class); - static final int WINDOW_SIZE = 1; // Default window duration in minutes - - /** - * Concept #2: A DoFn that sets the data element timestamp. This is a silly method, just for - * this example, for the bounded data case. - * - *

Imagine that many ghosts of Shakespeare are all typing madly at the same time to recreate - * his masterworks. Each line of the corpus will get a random associated timestamp somewhere in a - * 2-hour period. - */ - static class AddTimestampFn extends DoFn { - private static final long RAND_RANGE = 7200000; // 2 hours in ms - - @Override - public void processElement(ProcessContext c) { - // Generate a timestamp that falls somewhere in the past two hours. - long randomTimestamp = System.currentTimeMillis() - - (int) (Math.random() * RAND_RANGE); - /** - * Concept #2: Set the data element with that timestamp. - */ - c.outputWithTimestamp(c.element(), new Instant(randomTimestamp)); - } - } - - /** A DoFn that converts a Word and Count into a BigQuery table row. */ - static class FormatAsTableRowFn extends DoFn, TableRow> { - @Override - public void processElement(ProcessContext c) { - TableRow row = new TableRow() - .set("word", c.element().getKey()) - .set("count", c.element().getValue()) - // include a field for the window timestamp - .set("window_timestamp", c.timestamp().toString()); - c.output(row); - } - } - - /** - * Helper method that defines the BigQuery schema used for the output. - */ - private static TableSchema getSchema() { - List fields = new ArrayList<>(); - fields.add(new TableFieldSchema().setName("word").setType("STRING")); - fields.add(new TableFieldSchema().setName("count").setType("INTEGER")); - fields.add(new TableFieldSchema().setName("window_timestamp").setType("TIMESTAMP")); - TableSchema schema = new TableSchema().setFields(fields); - return schema; - } - - /** - * Concept #6: We'll stream the results to a BigQuery table. The BigQuery output source is one - * that supports both bounded and unbounded data. This is a helper method that creates a - * TableReference from input options, to tell the pipeline where to write its BigQuery results. - */ - private static TableReference getTableReference(Options options) { - TableReference tableRef = new TableReference(); - tableRef.setProjectId(options.getProject()); - tableRef.setDatasetId(options.getBigQueryDataset()); - tableRef.setTableId(options.getBigQueryTable()); - return tableRef; - } - - /** - * Options supported by {@link WindowedWordCount}. - * - *

Inherits standard example configuration options, which allow specification of the BigQuery - * table and the PubSub topic, as well as the {@link WordCount.WordCountOptions} support for - * specification of the input file. - */ - public static interface Options extends WordCount.WordCountOptions, - DataflowExampleOptions, ExamplePubsubTopicOptions, ExampleBigQueryTableOptions { - @Description("Fixed window duration, in minutes") - @Default.Integer(WINDOW_SIZE) - Integer getWindowSize(); - void setWindowSize(Integer value); - - @Description("Whether to run the pipeline with unbounded input") - boolean isUnbounded(); - void setUnbounded(boolean value); - } - - public static void main(String[] args) throws IOException { - Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); - options.setBigQuerySchema(getSchema()); - // DataflowExampleUtils creates the necessary input sources to simplify execution of this - // Pipeline. - DataflowExampleUtils exampleDataflowUtils = new DataflowExampleUtils(options, - options.isUnbounded()); - - Pipeline pipeline = Pipeline.create(options); - - /** - * Concept #1: the Dataflow SDK lets us run the same pipeline with either a bounded or - * unbounded input source. - */ - PCollection input; - if (options.isUnbounded()) { - LOG.info("Reading from PubSub."); - /** - * Concept #3: Read from the PubSub topic. A topic will be created if it wasn't - * specified as an argument. The data elements' timestamps will come from the pubsub - * injection. - */ - input = pipeline - .apply(PubsubIO.Read.topic(options.getPubsubTopic())); - } else { - /** Else, this is a bounded pipeline. Read from the GCS file. */ - input = pipeline - .apply(TextIO.Read.from(options.getInputFile())) - // Concept #2: Add an element timestamp, using an artificial time just to show windowing. - // See AddTimestampFn for more detail on this. - .apply(ParDo.of(new AddTimestampFn())); - } - - /** - * Concept #4: Window into fixed windows. The fixed window size for this example defaults to 1 - * minute (you can change this with a command-line option). See the documentation for more - * information on how fixed windows work, and for information on the other types of windowing - * available (e.g., sliding windows). - */ - PCollection windowedWords = input - .apply(Window.into( - FixedWindows.of(Duration.standardMinutes(options.getWindowSize())))); - - /** - * Concept #5: Re-use our existing CountWords transform that does not have knowledge of - * windows over a PCollection containing windowed values. - */ - PCollection> wordCounts = windowedWords.apply(new WordCount.CountWords()); - - /** - * Concept #6: Format the results for a BigQuery table, then write to BigQuery. - * The BigQuery output source supports both bounded and unbounded data. - */ - wordCounts.apply(ParDo.of(new FormatAsTableRowFn())) - .apply(BigQueryIO.Write - .to(getTableReference(options)) - .withSchema(getSchema()) - .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) - .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); - - PipelineResult result = pipeline.run(); - - /** - * To mock unbounded input from PubSub, we'll now start an auxiliary 'injector' pipeline that - * runs for a limited time, and publishes to the input PubSub topic. - * - * With an unbounded input source, you will need to explicitly shut down this pipeline when you - * are done with it, so that you do not continue to be charged for the instances. You can do - * this via a ctrl-C from the command line, or from the developer's console UI for Dataflow - * pipelines. The PubSub topic will also be deleted at this time. - */ - exampleDataflowUtils.mockUnboundedSource(options.getInputFile(), result); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/WordCount.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/WordCount.java b/examples/src/main/java/com/google/cloud/dataflow/examples/WordCount.java deleted file mode 100644 index 1086106..0000000 --- a/examples/src/main/java/com/google/cloud/dataflow/examples/WordCount.java +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.examples; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; -import com.google.cloud.dataflow.sdk.options.Default; -import com.google.cloud.dataflow.sdk.options.DefaultValueFactory; -import com.google.cloud.dataflow.sdk.options.Description; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.transforms.Aggregator; -import com.google.cloud.dataflow.sdk.transforms.Count; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.MapElements; -import com.google.cloud.dataflow.sdk.transforms.PTransform; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.transforms.SimpleFunction; -import com.google.cloud.dataflow.sdk.transforms.Sum; -import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; - - -/** - * An example that counts words in Shakespeare and includes Dataflow best practices. - * - *

This class, {@link WordCount}, is the second in a series of four successively more detailed - * 'word count' examples. You may first want to take a look at {@link MinimalWordCount}. - * After you've looked at this example, then see the {@link DebuggingWordCount} - * pipeline, for introduction of additional concepts. - * - *

For a detailed walkthrough of this example, see - * - * https://cloud.google.com/dataflow/java-sdk/wordcount-example - * - * - *

Basic concepts, also in the MinimalWordCount example: - * Reading text files; counting a PCollection; writing to GCS. - * - *

New Concepts: - *

- *   1. Executing a Pipeline both locally and using the Dataflow service
- *   2. Using ParDo with static DoFns defined out-of-line
- *   3. Building a composite transform
- *   4. Defining your own pipeline options
- * 
- * - *

Concept #1: you can execute this pipeline either locally or using the Dataflow service. - * These are now command-line options and not hard-coded as they were in the MinimalWordCount - * example. - * To execute this pipeline locally, specify general pipeline configuration: - *

{@code
- *   --project=YOUR_PROJECT_ID
- * }
- * 
- * and a local output file or output prefix on GCS: - *
{@code
- *   --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX]
- * }
- * - *

To execute this pipeline using the Dataflow service, specify pipeline configuration: - *

{@code
- *   --project=YOUR_PROJECT_ID
- *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- *   --runner=BlockingDataflowPipelineRunner
- * }
- * 
- * and an output prefix on GCS: - *
{@code
- *   --output=gs://YOUR_OUTPUT_PREFIX
- * }
- * - *

The input file defaults to {@code gs://dataflow-samples/shakespeare/kinglear.txt} and can be - * overridden with {@code --inputFile}. - */ -public class WordCount { - - /** - * Concept #2: You can make your pipeline code less verbose by defining your DoFns statically out- - * of-line. This DoFn tokenizes lines of text into individual words; we pass it to a ParDo in the - * pipeline. - */ - static class ExtractWordsFn extends DoFn { - private final Aggregator emptyLines = - createAggregator("emptyLines", new Sum.SumLongFn()); - - @Override - public void processElement(ProcessContext c) { - if (c.element().trim().isEmpty()) { - emptyLines.addValue(1L); - } - - // Split the line into words. - String[] words = c.element().split("[^a-zA-Z']+"); - - // Output each word encountered into the output PCollection. - for (String word : words) { - if (!word.isEmpty()) { - c.output(word); - } - } - } - } - - /** A SimpleFunction that converts a Word and Count into a printable string. */ - public static class FormatAsTextFn extends SimpleFunction, String> { - @Override - public String apply(KV input) { - return input.getKey() + ": " + input.getValue(); - } - } - - /** - * A PTransform that converts a PCollection containing lines of text into a PCollection of - * formatted word counts. - * - *

Concept #3: This is a custom composite transform that bundles two transforms (ParDo and - * Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse, - * modular testing, and an improved monitoring experience. - */ - public static class CountWords extends PTransform, - PCollection>> { - @Override - public PCollection> apply(PCollection lines) { - - // Convert lines of text into individual words. - PCollection words = lines.apply( - ParDo.of(new ExtractWordsFn())); - - // Count the number of times each word occurs. - PCollection> wordCounts = - words.apply(Count.perElement()); - - return wordCounts; - } - } - - /** - * Options supported by {@link WordCount}. - * - *

Concept #4: Defining your own configuration options. Here, you can add your own arguments - * to be processed by the command-line parser, and specify default values for them. You can then - * access the options values in your pipeline code. - * - *

Inherits standard configuration options. - */ - public static interface WordCountOptions extends PipelineOptions { - @Description("Path of the file to read from") - @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") - String getInputFile(); - void setInputFile(String value); - - @Description("Path of the file to write to") - @Default.InstanceFactory(OutputFactory.class) - String getOutput(); - void setOutput(String value); - - /** - * Returns "gs://${YOUR_STAGING_DIRECTORY}/counts.txt" as the default destination. - */ - public static class OutputFactory implements DefaultValueFactory { - @Override - public String create(PipelineOptions options) { - DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); - if (dataflowOptions.getStagingLocation() != null) { - return GcsPath.fromUri(dataflowOptions.getStagingLocation()) - .resolve("counts.txt").toString(); - } else { - throw new IllegalArgumentException("Must specify --output or --stagingLocation"); - } - } - } - - } - - public static void main(String[] args) { - WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() - .as(WordCountOptions.class); - Pipeline p = Pipeline.create(options); - - // Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the - // static FormatAsTextFn() to the ParDo transform. - p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile())) - .apply(new CountWords()) - .apply(MapElements.via(new FormatAsTextFn())) - .apply(TextIO.Write.named("WriteCounts").to(options.getOutput())); - - p.run(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/common/DataflowExampleOptions.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/common/DataflowExampleOptions.java b/examples/src/main/java/com/google/cloud/dataflow/examples/common/DataflowExampleOptions.java deleted file mode 100644 index 606bfb4..0000000 --- a/examples/src/main/java/com/google/cloud/dataflow/examples/common/DataflowExampleOptions.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.examples.common; - -import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; -import com.google.cloud.dataflow.sdk.options.Default; -import com.google.cloud.dataflow.sdk.options.Description; - -/** - * Options that can be used to configure the Dataflow examples. - */ -public interface DataflowExampleOptions extends DataflowPipelineOptions { - @Description("Whether to keep jobs running on the Dataflow service after local process exit") - @Default.Boolean(false) - boolean getKeepJobsRunning(); - void setKeepJobsRunning(boolean keepJobsRunning); - - @Description("Number of workers to use when executing the injector pipeline") - @Default.Integer(1) - int getInjectorNumWorkers(); - void setInjectorNumWorkers(int numWorkers); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/common/DataflowExampleUtils.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/common/DataflowExampleUtils.java b/examples/src/main/java/com/google/cloud/dataflow/examples/common/DataflowExampleUtils.java deleted file mode 100644 index 4dfdd85..0000000 --- a/examples/src/main/java/com/google/cloud/dataflow/examples/common/DataflowExampleUtils.java +++ /dev/null @@ -1,485 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.examples.common; - -import com.google.api.client.googleapis.json.GoogleJsonResponseException; -import com.google.api.client.googleapis.services.AbstractGoogleClientRequest; -import com.google.api.client.util.BackOff; -import com.google.api.client.util.BackOffUtils; -import com.google.api.client.util.Sleeper; -import com.google.api.services.bigquery.Bigquery; -import com.google.api.services.bigquery.Bigquery.Datasets; -import com.google.api.services.bigquery.Bigquery.Tables; -import com.google.api.services.bigquery.model.Dataset; -import com.google.api.services.bigquery.model.DatasetReference; -import com.google.api.services.bigquery.model.Table; -import com.google.api.services.bigquery.model.TableReference; -import com.google.api.services.bigquery.model.TableSchema; -import com.google.api.services.dataflow.Dataflow; -import com.google.api.services.pubsub.Pubsub; -import com.google.api.services.pubsub.model.Subscription; -import com.google.api.services.pubsub.model.Topic; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.PipelineResult; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.options.BigQueryOptions; -import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; -import com.google.cloud.dataflow.sdk.runners.DataflowPipelineJob; -import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; -import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner; -import com.google.cloud.dataflow.sdk.transforms.IntraBundleParallelization; -import com.google.cloud.dataflow.sdk.transforms.PTransform; -import com.google.cloud.dataflow.sdk.util.AttemptBoundedExponentialBackOff; -import com.google.cloud.dataflow.sdk.util.MonitoringUtil; -import com.google.cloud.dataflow.sdk.util.Transport; -import com.google.cloud.dataflow.sdk.values.PBegin; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.common.base.Strings; -import com.google.common.base.Throwables; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - -import java.io.IOException; -import java.util.Collection; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -import javax.servlet.http.HttpServletResponse; - -/** - * The utility class that sets up and tears down external resources, starts the Google Cloud Pub/Sub - * injector, and cancels the streaming and the injector pipelines once the program terminates. - * - *

It is used to run Dataflow examples, such as TrafficMaxLaneFlow and TrafficRoutes. - */ -public class DataflowExampleUtils { - - private final DataflowPipelineOptions options; - private Bigquery bigQueryClient = null; - private Pubsub pubsubClient = null; - private Dataflow dataflowClient = null; - private Set jobsToCancel = Sets.newHashSet(); - private List pendingMessages = Lists.newArrayList(); - - public DataflowExampleUtils(DataflowPipelineOptions options) { - this.options = options; - } - - /** - * Do resources and runner options setup. - */ - public DataflowExampleUtils(DataflowPipelineOptions options, boolean isUnbounded) - throws IOException { - this.options = options; - setupResourcesAndRunner(isUnbounded); - } - - /** - * Sets up external resources that are required by the example, - * such as Pub/Sub topics and BigQuery tables. - * - * @throws IOException if there is a problem setting up the resources - */ - public void setup() throws IOException { - Sleeper sleeper = Sleeper.DEFAULT; - BackOff backOff = new AttemptBoundedExponentialBackOff(3, 200); - Throwable lastException = null; - try { - do { - try { - setupPubsub(); - setupBigQueryTable(); - return; - } catch (GoogleJsonResponseException e) { - lastException = e; - } - } while (BackOffUtils.next(sleeper, backOff)); - } catch (InterruptedException e) { - // Ignore InterruptedException - } - Throwables.propagate(lastException); - } - - /** - * Set up external resources, and configure the runner appropriately. - */ - public void setupResourcesAndRunner(boolean isUnbounded) throws IOException { - if (isUnbounded) { - options.setStreaming(true); - } - setup(); - setupRunner(); - } - - /** - * Sets up the Google Cloud Pub/Sub topic. - * - *

If the topic doesn't exist, a new topic with the given name will be created. - * - * @throws IOException if there is a problem setting up the Pub/Sub topic - */ - public void setupPubsub() throws IOException { - ExamplePubsubTopicAndSubscriptionOptions pubsubOptions = - options.as(ExamplePubsubTopicAndSubscriptionOptions.class); - if (!pubsubOptions.getPubsubTopic().isEmpty()) { - pendingMessages.add("**********************Set Up Pubsub************************"); - setupPubsubTopic(pubsubOptions.getPubsubTopic()); - pendingMessages.add("The Pub/Sub topic has been set up for this example: " - + pubsubOptions.getPubsubTopic()); - - if (!pubsubOptions.getPubsubSubscription().isEmpty()) { - setupPubsubSubscription( - pubsubOptions.getPubsubTopic(), pubsubOptions.getPubsubSubscription()); - pendingMessages.add("The Pub/Sub subscription has been set up for this example: " - + pubsubOptions.getPubsubSubscription()); - } - } - } - - /** - * Sets up the BigQuery table with the given schema. - * - *

If the table already exists, the schema has to match the given one. Otherwise, the example - * will throw a RuntimeException. If the table doesn't exist, a new table with the given schema - * will be created. - * - * @throws IOException if there is a problem setting up the BigQuery table - */ - public void setupBigQueryTable() throws IOException { - ExampleBigQueryTableOptions bigQueryTableOptions = - options.as(ExampleBigQueryTableOptions.class); - if (bigQueryTableOptions.getBigQueryDataset() != null - && bigQueryTableOptions.getBigQueryTable() != null - && bigQueryTableOptions.getBigQuerySchema() != null) { - pendingMessages.add("******************Set Up Big Query Table*******************"); - setupBigQueryTable(bigQueryTableOptions.getProject(), - bigQueryTableOptions.getBigQueryDataset(), - bigQueryTableOptions.getBigQueryTable(), - bigQueryTableOptions.getBigQuerySchema()); - pendingMessages.add("The BigQuery table has been set up for this example: " - + bigQueryTableOptions.getProject() - + ":" + bigQueryTableOptions.getBigQueryDataset() - + "." + bigQueryTableOptions.getBigQueryTable()); - } - } - - /** - * Tears down external resources that can be deleted upon the example's completion. - */ - private void tearDown() { - pendingMessages.add("*************************Tear Down*************************"); - ExamplePubsubTopicAndSubscriptionOptions pubsubOptions = - options.as(ExamplePubsubTopicAndSubscriptionOptions.class); - if (!pubsubOptions.getPubsubTopic().isEmpty()) { - try { - deletePubsubTopic(pubsubOptions.getPubsubTopic()); - pendingMessages.add("The Pub/Sub topic has been deleted: " - + pubsubOptions.getPubsubTopic()); - } catch (IOException e) { - pendingMessages.add("Failed to delete the Pub/Sub topic : " - + pubsubOptions.getPubsubTopic()); - } - if (!pubsubOptions.getPubsubSubscription().isEmpty()) { - try { - deletePubsubSubscription(pubsubOptions.getPubsubSubscription()); - pendingMessages.add("The Pub/Sub subscription has been deleted: " - + pubsubOptions.getPubsubSubscription()); - } catch (IOException e) { - pendingMessages.add("Failed to delete the Pub/Sub subscription : " - + pubsubOptions.getPubsubSubscription()); - } - } - } - - ExampleBigQueryTableOptions bigQueryTableOptions = - options.as(ExampleBigQueryTableOptions.class); - if (bigQueryTableOptions.getBigQueryDataset() != null - && bigQueryTableOptions.getBigQueryTable() != null - && bigQueryTableOptions.getBigQuerySchema() != null) { - pendingMessages.add("The BigQuery table might contain the example's output, " - + "and it is not deleted automatically: " - + bigQueryTableOptions.getProject() - + ":" + bigQueryTableOptions.getBigQueryDataset() - + "." + bigQueryTableOptions.getBigQueryTable()); - pendingMessages.add("Please go to the Developers Console to delete it manually." - + " Otherwise, you may be charged for its usage."); - } - } - - private void setupBigQueryTable(String projectId, String datasetId, String tableId, - TableSchema schema) throws IOException { - if (bigQueryClient == null) { - bigQueryClient = Transport.newBigQueryClient(options.as(BigQueryOptions.class)).build(); - } - - Datasets datasetService = bigQueryClient.datasets(); - if (executeNullIfNotFound(datasetService.get(projectId, datasetId)) == null) { - Dataset newDataset = new Dataset().setDatasetReference( - new DatasetReference().setProjectId(projectId).setDatasetId(datasetId)); - datasetService.insert(projectId, newDataset).execute(); - } - - Tables tableService = bigQueryClient.tables(); - Table table = executeNullIfNotFound(tableService.get(projectId, datasetId, tableId)); - if (table == null) { - Table newTable = new Table().setSchema(schema).setTableReference( - new TableReference().setProjectId(projectId).setDatasetId(datasetId).setTableId(tableId)); - tableService.insert(projectId, datasetId, newTable).execute(); - } else if (!table.getSchema().equals(schema)) { - throw new RuntimeException( - "Table exists and schemas do not match, expecting: " + schema.toPrettyString() - + ", actual: " + table.getSchema().toPrettyString()); - } - } - - private void setupPubsubTopic(String topic) throws IOException { - if (pubsubClient == null) { - pubsubClient = Transport.newPubsubClient(options).build(); - } - if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) == null) { - pubsubClient.projects().topics().create(topic, new Topic().setName(topic)).execute(); - } - } - - private void setupPubsubSubscription(String topic, String subscription) throws IOException { - if (pubsubClient == null) { - pubsubClient = Transport.newPubsubClient(options).build(); - } - if (executeNullIfNotFound(pubsubClient.projects().subscriptions().get(subscription)) == null) { - Subscription subInfo = new Subscription() - .setAckDeadlineSeconds(60) - .setTopic(topic); - pubsubClient.projects().subscriptions().create(subscription, subInfo).execute(); - } - } - - /** - * Deletes the Google Cloud Pub/Sub topic. - * - * @throws IOException if there is a problem deleting the Pub/Sub topic - */ - private void deletePubsubTopic(String topic) throws IOException { - if (pubsubClient == null) { - pubsubClient = Transport.newPubsubClient(options).build(); - } - if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) != null) { - pubsubClient.projects().topics().delete(topic).execute(); - } - } - - /** - * Deletes the Google Cloud Pub/Sub subscription. - * - * @throws IOException if there is a problem deleting the Pub/Sub subscription - */ - private void deletePubsubSubscription(String subscription) throws IOException { - if (pubsubClient == null) { - pubsubClient = Transport.newPubsubClient(options).build(); - } - if (executeNullIfNotFound(pubsubClient.projects().subscriptions().get(subscription)) != null) { - pubsubClient.projects().subscriptions().delete(subscription).execute(); - } - } - - /** - * If this is an unbounded (streaming) pipeline, and both inputFile and pubsub topic are defined, - * start an 'injector' pipeline that publishes the contents of the file to the given topic, first - * creating the topic if necessary. - */ - public void startInjectorIfNeeded(String inputFile) { - ExamplePubsubTopicOptions pubsubTopicOptions = options.as(ExamplePubsubTopicOptions.class); - if (pubsubTopicOptions.isStreaming() - && !Strings.isNullOrEmpty(inputFile) - && !Strings.isNullOrEmpty(pubsubTopicOptions.getPubsubTopic())) { - runInjectorPipeline(inputFile, pubsubTopicOptions.getPubsubTopic()); - } - } - - /** - * Do some runner setup: check that the DirectPipelineRunner is not used in conjunction with - * streaming, and if streaming is specified, use the DataflowPipelineRunner. Return the streaming - * flag value. - */ - public void setupRunner() { - if (options.isStreaming() && options.getRunner() != DirectPipelineRunner.class) { - // In order to cancel the pipelines automatically, - // {@literal DataflowPipelineRunner} is forced to be used. - options.setRunner(DataflowPipelineRunner.class); - } - } - - /** - * Runs a batch pipeline to inject data into the PubSubIO input topic. - * - *

The injector pipeline will read from the given text file, and inject data - * into the Google Cloud Pub/Sub topic. - */ - public void runInjectorPipeline(String inputFile, String topic) { - runInjectorPipeline(TextIO.Read.from(inputFile), topic, null); - } - - /** - * Runs a batch pipeline to inject data into the PubSubIO input topic. - * - *

The injector pipeline will read from the given source, and inject data - * into the Google Cloud Pub/Sub topic. - */ - public void runInjectorPipeline(PTransform> readSource, - String topic, - String pubsubTimestampTabelKey) { - PubsubFileInjector.Bound injector; - if (Strings.isNullOrEmpty(pubsubTimestampTabelKey)) { - injector = PubsubFileInjector.publish(topic); - } else { - injector = PubsubFileInjector.withTimestampLabelKey(pubsubTimestampTabelKey).publish(topic); - } - DataflowPipelineOptions copiedOptions = options.cloneAs(DataflowPipelineOptions.class); - if (options.getServiceAccountName() != null) { - copiedOptions.setServiceAccountName(options.getServiceAccountName()); - } - if (options.getServiceAccountKeyfile() != null) { - copiedOptions.setServiceAccountKeyfile(options.getServiceAccountKeyfile()); - } - copiedOptions.setStreaming(false); - copiedOptions.setNumWorkers(options.as(DataflowExampleOptions.class).getInjectorNumWorkers()); - copiedOptions.setJobName(options.getJobName() + "-injector"); - Pipeline injectorPipeline = Pipeline.create(copiedOptions); - injectorPipeline.apply(readSource) - .apply(IntraBundleParallelization - .of(injector) - .withMaxParallelism(20)); - PipelineResult result = injectorPipeline.run(); - if (result instanceof DataflowPipelineJob) { - jobsToCancel.add(((DataflowPipelineJob) result)); - } - } - - /** - * Runs the provided pipeline to inject data into the PubSubIO input topic. - */ - public void runInjectorPipeline(Pipeline injectorPipeline) { - PipelineResult result = injectorPipeline.run(); - if (result instanceof DataflowPipelineJob) { - jobsToCancel.add(((DataflowPipelineJob) result)); - } - } - - /** - * Start the auxiliary injector pipeline, then wait for this pipeline to finish. - */ - public void mockUnboundedSource(String inputFile, PipelineResult result) { - startInjectorIfNeeded(inputFile); - waitToFinish(result); - } - - /** - * If {@literal DataflowPipelineRunner} or {@literal BlockingDataflowPipelineRunner} is used, - * waits for the pipeline to finish and cancels it (and the injector) before the program exists. - */ - public void waitToFinish(PipelineResult result) { - if (result instanceof DataflowPipelineJob) { - final DataflowPipelineJob job = (DataflowPipelineJob) result; - jobsToCancel.add(job); - if (!options.as(DataflowExampleOptions.class).getKeepJobsRunning()) { - addShutdownHook(jobsToCancel); - } - try { - job.waitToFinish(-1, TimeUnit.SECONDS, new MonitoringUtil.PrintHandler(System.out)); - } catch (Exception e) { - throw new RuntimeException("Failed to wait for job to finish: " + job.getJobId()); - } - } else { - // Do nothing if the given PipelineResult doesn't support waitToFinish(), - // such as EvaluationResults returned by DirectPipelineRunner. - tearDown(); - printPendingMessages(); - } - } - - private void addShutdownHook(final Collection jobs) { - if (dataflowClient == null) { - dataflowClient = options.getDataflowClient(); - } - - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - tearDown(); - printPendingMessages(); - for (DataflowPipelineJob job : jobs) { - System.out.println("Canceling example pipeline: " + job.getJobId()); - try { - job.cancel(); - } catch (IOException e) { - System.out.println("Failed to cancel the job," - + " please go to the Developers Console to cancel it manually"); - System.out.println( - MonitoringUtil.getJobMonitoringPageURL(job.getProjectId(), job.getJobId())); - } - } - - for (DataflowPipelineJob job : jobs) { - boolean cancellationVerified = false; - for (int retryAttempts = 6; retryAttempts > 0; retryAttempts--) { - if (job.getState().isTerminal()) { - cancellationVerified = true; - System.out.println("Canceled example pipeline: " + job.getJobId()); - break; - } else { - System.out.println( - "The example pipeline is still running. Verifying the cancellation."); - } - try { - Thread.sleep(10000); - } catch (InterruptedException e) { - // Ignore - } - } - if (!cancellationVerified) { - System.out.println("Failed to verify the cancellation for job: " + job.getJobId()); - System.out.println("Please go to the Developers Console to verify manually:"); - System.out.println( - MonitoringUtil.getJobMonitoringPageURL(job.getProjectId(), job.getJobId())); - } - } - } - }); - } - - private void printPendingMessages() { - System.out.println(); - System.out.println("***********************************************************"); - System.out.println("***********************************************************"); - for (String message : pendingMessages) { - System.out.println(message); - } - System.out.println("***********************************************************"); - System.out.println("***********************************************************"); - } - - private static T executeNullIfNotFound( - AbstractGoogleClientRequest request) throws IOException { - try { - return request.execute(); - } catch (GoogleJsonResponseException e) { - if (e.getStatusCode() == HttpServletResponse.SC_NOT_FOUND) { - return null; - } else { - throw e; - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/common/ExampleBigQueryTableOptions.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/common/ExampleBigQueryTableOptions.java b/examples/src/main/java/com/google/cloud/dataflow/examples/common/ExampleBigQueryTableOptions.java deleted file mode 100644 index 7c213b5..0000000 --- a/examples/src/main/java/com/google/cloud/dataflow/examples/common/ExampleBigQueryTableOptions.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.examples.common; - -import com.google.api.services.bigquery.model.TableSchema; -import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; -import com.google.cloud.dataflow.sdk.options.Default; -import com.google.cloud.dataflow.sdk.options.DefaultValueFactory; -import com.google.cloud.dataflow.sdk.options.Description; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; - -/** - * Options that can be used to configure BigQuery tables in Dataflow examples. - * The project defaults to the project being used to run the example. - */ -public interface ExampleBigQueryTableOptions extends DataflowPipelineOptions { - @Description("BigQuery dataset name") - @Default.String("dataflow_examples") - String getBigQueryDataset(); - void setBigQueryDataset(String dataset); - - @Description("BigQuery table name") - @Default.InstanceFactory(BigQueryTableFactory.class) - String getBigQueryTable(); - void setBigQueryTable(String table); - - @Description("BigQuery table schema") - TableSchema getBigQuerySchema(); - void setBigQuerySchema(TableSchema schema); - - /** - * Returns the job name as the default BigQuery table name. - */ - static class BigQueryTableFactory implements DefaultValueFactory { - @Override - public String create(PipelineOptions options) { - return options.as(DataflowPipelineOptions.class).getJobName() - .replace('-', '_'); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java b/examples/src/main/java/com/google/cloud/dataflow/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java deleted file mode 100644 index d7bd4b8..0000000 --- a/examples/src/main/java/com/google/cloud/dataflow/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.examples.common; - -import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; -import com.google.cloud.dataflow.sdk.options.Default; -import com.google.cloud.dataflow.sdk.options.DefaultValueFactory; -import com.google.cloud.dataflow.sdk.options.Description; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; - -/** - * Options that can be used to configure Pub/Sub topic/subscription in Dataflow examples. - */ -public interface ExamplePubsubTopicAndSubscriptionOptions extends ExamplePubsubTopicOptions { - @Description("Pub/Sub subscription") - @Default.InstanceFactory(PubsubSubscriptionFactory.class) - String getPubsubSubscription(); - void setPubsubSubscription(String subscription); - - /** - * Returns a default Pub/Sub subscription based on the project and the job names. - */ - static class PubsubSubscriptionFactory implements DefaultValueFactory { - @Override - public String create(PipelineOptions options) { - DataflowPipelineOptions dataflowPipelineOptions = - options.as(DataflowPipelineOptions.class); - return "projects/" + dataflowPipelineOptions.getProject() - + "/subscriptions/" + dataflowPipelineOptions.getJobName(); - } - } -}