beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [17/18] incubator-beam git commit: [BEAM-151] Move a large portion of the Dataflow runner to separate maven module
Date Mon, 11 Apr 2016 23:42:51 GMT
[BEAM-151] Move a large portion of the Dataflow runner to separate maven module

Note that Flink runner and Java examples both depend on the Dataflow runner
still. So I needed to make the Dataflow runner a top level component to
satisfy these dependencies.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6b4857cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6b4857cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6b4857cc

Branch: refs/heads/master
Commit: 6b4857ccc3ffc15c7a825e181ce22c5e8b17ceae
Parents: 1bbb7af
Author: Luke Cwik <lcwik@google.com>
Authored: Mon Apr 11 12:49:07 2016 -0700
Committer: Luke Cwik <lcwik@google.com>
Committed: Mon Apr 11 16:18:16 2016 -0700

----------------------------------------------------------------------
 examples/java/pom.xml                           |    6 +
 examples/java8/pom.xml                          |    6 +
 pom.xml                                         |    6 +-
 runners/flink/runner/pom.xml                    |   12 +
 runners/google-cloud-dataflow-java/pom.xml      |  560 ++++
 .../BlockingDataflowPipelineOptions.java        |   50 +
 .../sdk/options/CloudDebuggerOptions.java       |   52 +
 .../options/DataflowPipelineDebugOptions.java   |  253 ++
 .../sdk/options/DataflowPipelineOptions.java    |  115 +
 .../DataflowPipelineWorkerPoolOptions.java      |  258 ++
 .../sdk/options/DataflowProfilingOptions.java   |   48 +
 .../options/DataflowWorkerHarnessOptions.java   |   51 +
 .../options/DataflowWorkerLoggingOptions.java   |  155 +
 .../runners/BlockingDataflowPipelineRunner.java |  185 ++
 .../DataflowJobAlreadyExistsException.java      |   35 +
 .../DataflowJobAlreadyUpdatedException.java     |   34 +
 .../runners/DataflowJobCancelledException.java  |   39 +
 .../sdk/runners/DataflowJobException.java       |   41 +
 .../runners/DataflowJobExecutionException.java  |   35 +
 .../runners/DataflowJobUpdatedException.java    |   52 +
 .../dataflow/sdk/runners/DataflowPipeline.java  |   60 +
 .../sdk/runners/DataflowPipelineJob.java        |  394 +++
 .../sdk/runners/DataflowPipelineRegistrar.java  |   59 +
 .../sdk/runners/DataflowPipelineRunner.java     | 3008 ++++++++++++++++++
 .../runners/DataflowPipelineRunnerHooks.java    |   38 +
 .../sdk/runners/DataflowPipelineTranslator.java | 1105 +++++++
 .../sdk/runners/DataflowServiceException.java   |   33 +
 .../sdk/runners/dataflow/AssignWindows.java     |   90 +
 .../runners/dataflow/BigQueryIOTranslator.java  |  126 +
 .../sdk/runners/dataflow/CustomSources.java     |  119 +
 .../dataflow/DataflowAggregatorTransforms.java  |   80 +
 .../dataflow/DataflowMetricUpdateExtractor.java |  111 +
 .../runners/dataflow/PubsubIOTranslator.java    |  108 +
 .../sdk/runners/dataflow/ReadTranslator.java    |  104 +
 .../sdk/runners/dataflow/package-info.java      |   21 +
 .../testing/TestDataflowPipelineOptions.java    |   27 +
 .../sdk/testing/TestDataflowPipelineRunner.java |  256 ++
 .../sdk/util/DataflowPathValidator.java         |   98 +
 .../dataflow/sdk/util/DataflowTransport.java    |  112 +
 .../cloud/dataflow/sdk/util/GcsStager.java      |   54 +
 .../cloud/dataflow/sdk/util/MonitoringUtil.java |  235 ++
 .../cloud/dataflow/sdk/util/PackageUtil.java    |  328 ++
 .../google/cloud/dataflow/sdk/util/Stager.java  |   30 +
 .../dataflow/sdk/io/DataflowTextIOTest.java     |  118 +
 .../DataflowPipelineDebugOptionsTest.java       |   41 +
 .../options/DataflowPipelineOptionsTest.java    |   92 +
 .../options/DataflowProfilingOptionsTest.java   |   49 +
 .../DataflowWorkerLoggingOptionsTest.java       |   75 +
 .../BlockingDataflowPipelineRunnerTest.java     |  302 ++
 .../sdk/runners/DataflowPipelineJobTest.java    |  605 ++++
 .../runners/DataflowPipelineRegistrarTest.java  |   73 +
 .../sdk/runners/DataflowPipelineRunnerTest.java | 1369 ++++++++
 .../sdk/runners/DataflowPipelineTest.java       |   45 +
 .../runners/DataflowPipelineTranslatorTest.java |  890 ++++++
 .../sdk/runners/dataflow/CustomSourcesTest.java |  274 ++
 .../testing/TestDataflowPipelineRunnerTest.java |  377 +++
 .../sdk/transforms/DataflowGroupByKeyTest.java  |  111 +
 .../sdk/transforms/DataflowViewTest.java        |  206 ++
 .../sdk/util/DataflowPathValidatorTest.java     |   93 +
 .../dataflow/sdk/util/MonitoringUtilTest.java   |  148 +
 .../dataflow/sdk/util/PackageUtilTest.java      |  483 +++
 sdks/java/core/pom.xml                          |   28 -
 .../BlockingDataflowPipelineOptions.java        |   50 -
 .../sdk/options/CloudDebuggerOptions.java       |   52 -
 .../options/DataflowPipelineDebugOptions.java   |  253 --
 .../sdk/options/DataflowPipelineOptions.java    |  115 -
 .../DataflowPipelineWorkerPoolOptions.java      |  258 --
 .../sdk/options/DataflowProfilingOptions.java   |   48 -
 .../options/DataflowWorkerHarnessOptions.java   |   51 -
 .../options/DataflowWorkerLoggingOptions.java   |  155 -
 .../runners/BlockingDataflowPipelineRunner.java |  185 --
 .../DataflowJobAlreadyExistsException.java      |   35 -
 .../DataflowJobAlreadyUpdatedException.java     |   34 -
 .../runners/DataflowJobCancelledException.java  |   39 -
 .../sdk/runners/DataflowJobException.java       |   41 -
 .../runners/DataflowJobExecutionException.java  |   35 -
 .../runners/DataflowJobUpdatedException.java    |   52 -
 .../dataflow/sdk/runners/DataflowPipeline.java  |   60 -
 .../sdk/runners/DataflowPipelineJob.java        |  394 ---
 .../sdk/runners/DataflowPipelineRegistrar.java  |   59 -
 .../sdk/runners/DataflowPipelineRunner.java     | 3008 ------------------
 .../runners/DataflowPipelineRunnerHooks.java    |   38 -
 .../sdk/runners/DataflowPipelineTranslator.java | 1105 -------
 .../sdk/runners/DataflowServiceException.java   |   33 -
 .../sdk/runners/dataflow/AssignWindows.java     |   90 -
 .../runners/dataflow/BigQueryIOTranslator.java  |  126 -
 .../sdk/runners/dataflow/CustomSources.java     |  119 -
 .../dataflow/DataflowAggregatorTransforms.java  |   80 -
 .../dataflow/DataflowMetricUpdateExtractor.java |  111 -
 .../runners/dataflow/PubsubIOTranslator.java    |  108 -
 .../sdk/runners/dataflow/ReadTranslator.java    |  104 -
 .../sdk/runners/dataflow/package-info.java      |   21 -
 .../testing/TestDataflowPipelineOptions.java    |   27 -
 .../sdk/testing/TestDataflowPipelineRunner.java |  256 --
 .../sdk/util/DataflowPathValidator.java         |   98 -
 .../dataflow/sdk/util/DataflowTransport.java    |  112 -
 .../cloud/dataflow/sdk/util/GcsStager.java      |   54 -
 .../cloud/dataflow/sdk/util/MonitoringUtil.java |  235 --
 .../cloud/dataflow/sdk/util/PackageUtil.java    |  328 --
 .../google/cloud/dataflow/sdk/util/Stager.java  |   30 -
 .../dataflow/sdk/io/DataflowTextIOTest.java     |  118 -
 .../DataflowPipelineDebugOptionsTest.java       |   41 -
 .../options/DataflowPipelineOptionsTest.java    |   92 -
 .../options/DataflowProfilingOptionsTest.java   |   49 -
 .../DataflowWorkerLoggingOptionsTest.java       |   75 -
 .../sdk/options/PipelineOptionsFactoryTest.java |   43 +-
 .../BlockingDataflowPipelineRunnerTest.java     |  302 --
 .../sdk/runners/DataflowPipelineJobTest.java    |  605 ----
 .../runners/DataflowPipelineRegistrarTest.java  |   73 -
 .../sdk/runners/DataflowPipelineRunnerTest.java | 1369 --------
 .../sdk/runners/DataflowPipelineTest.java       |   45 -
 .../runners/DataflowPipelineTranslatorTest.java |  890 ------
 .../sdk/runners/dataflow/CustomSourcesTest.java |  274 --
 .../testing/TestDataflowPipelineRunnerTest.java |  377 ---
 .../sdk/transforms/DataflowGroupByKeyTest.java  |  111 -
 .../sdk/transforms/DataflowViewTest.java        |  206 --
 .../sdk/util/DataflowPathValidatorTest.java     |   93 -
 .../dataflow/sdk/util/MonitoringUtilTest.java   |  148 -
 .../dataflow/sdk/util/PackageUtilTest.java      |  483 ---
 .../main/resources/archetype-resources/pom.xml  |    9 +-
 120 files changed, 13975 insertions(+), 13395 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/examples/java/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index d63ac23..a90973a 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -243,6 +243,12 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>google-cloud-dataflow-java-runner</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
       <groupId>com.google.api-client</groupId>
       <artifactId>google-api-client</artifactId>
       <version>${google-clients.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/examples/java8/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java8/pom.xml b/examples/java8/pom.xml
index c8c4134..822341b 100644
--- a/examples/java8/pom.xml
+++ b/examples/java8/pom.xml
@@ -164,6 +164,12 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>google-cloud-dataflow-java-runner</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
       <version>${guava.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 591b67a..08073a2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -126,8 +126,12 @@
 
   <modules>
     <module>sdks</module>
+    <!-- Expose Dataflow runner as top level module to satisfy dependencies
+         in sdks/java/maven-archetypes and examples/java. Until these are
+         refactored out, we need to modify the build order. -->
+    <module>runners/google-cloud-dataflow-java</module>
     <module>runners</module>
-    <!-- sdks/java/maven-archtypes has several dependencies on the
+    <!-- sdks/java/maven-archetypes has several dependencies on the
          DataflowPipelineRunner. Until these are refactored out or
          a released artifact exists, we need to modify the build order. -->
     <module>sdks/java/maven-archetypes</module>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/runners/flink/runner/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
index 109eb25..7a1011a 100644
--- a/runners/flink/runner/pom.xml
+++ b/runners/flink/runner/pom.xml
@@ -75,6 +75,18 @@
         </exclusion>
       </exclusions>
     </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>google-cloud-dataflow-java-runner</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-jdk14</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
     <!-- Test scoped -->
     <dependency>
       <groupId>org.apache.beam</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
new file mode 100644
index 0000000..3d62b0e
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -0,0 +1,560 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <!-- We use the Beam parent to allow for runners/flink, examples/java and
+       sdks/java/maven-archetypes to depend on the Dataflow runner. This is
+       until those modules can migrate off of the Dataflow runner classes. -->
+  <parent>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>parent</artifactId>
+      <version>0.1.0-incubating-SNAPSHOT</version>
+      <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>google-cloud-dataflow-java-runner</artifactId>
+
+  <name>Apache Beam :: Runners :: Google Cloud Dataflow</name>
+
+  <packaging>jar</packaging>
+
+  <properties>
+    <beam.version>0.1.0-incubating-SNAPSHOT</beam.version>
+    <timestamp>${maven.build.timestamp}</timestamp>
+    <maven.build.timestamp.format>yyyy-MM-dd HH:mm</maven.build.timestamp.format>
+    <dataflow>com.google.cloud.dataflow</dataflow>
+    <runIntegrationTestOnService>false</runIntegrationTestOnService>
+    <testParallelValue>none</testParallelValue>
+    <testGroups></testGroups>
+    <dataflowProjectName></dataflowProjectName>
+  </properties>
+
+  <profiles>
+    <profile>
+      <id>DataflowPipelineTests</id>
+      <properties>
+        <runIntegrationTestOnService>true</runIntegrationTestOnService>
+        <testGroups>com.google.cloud.dataflow.sdk.testing.RunnableOnService</testGroups>
+        <testParallelValue>both</testParallelValue>
+      </properties>
+    </profile>
+  </profiles>
+
+  <build>
+    <resources>
+      <resource>
+        <directory>src/main/resources</directory>
+        <filtering>true</filtering>
+      </resource>
+    </resources>
+
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals><goal>analyze-only</goal></goals>
+            <configuration>
+              <failOnWarning>true</failOnWarning>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <!-- Run CheckStyle pass on transforms, as they are release in
+           source form. -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+        <version>2.12</version>
+        <dependencies>
+          <dependency>
+            <groupId>com.puppycrawl.tools</groupId>
+            <artifactId>checkstyle</artifactId>
+            <version>6.6</version>
+          </dependency>
+        </dependencies>
+        <configuration>
+          <configLocation>../../sdks/java/checkstyle.xml</configLocation>
+          <consoleOutput>true</consoleOutput>
+          <failOnViolation>true</failOnViolation>
+          <includeResources>false</includeResources>
+          <includeTestSourceDirectory>true</includeTestSourceDirectory>
+          <excludes>${project.build.directory}/generated-test-sources/**</excludes>
+        </configuration>
+        <executions>
+          <execution>
+            <goals>
+              <goal>check</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>default-jar</id>
+            <goals>
+              <goal>jar</goal>
+            </goals>
+          </execution>
+          <execution>
+            <id>default-test-jar</id>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
+      <!-- Source plugin for generating source and test-source JARs. -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-source-plugin</artifactId>
+        <version>2.4</version>
+        <executions>
+          <execution>
+            <id>attach-sources</id>
+            <phase>compile</phase>
+            <goals>
+              <goal>jar</goal>
+            </goals>
+          </execution>
+          <execution>
+            <id>attach-test-sources</id>
+            <phase>test-compile</phase>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-javadoc-plugin</artifactId>
+        <configuration>
+          <windowtitle>Google Cloud Dataflow Java Runner ${project.version}</windowtitle>
+          <doctitle>Google Cloud Dataflow Runner for Java, version ${project.version}</doctitle>
+
+          <subpackages>com.google.cloud.dataflow.sdk</subpackages>
+          <additionalparam>-exclude com.google.cloud.dataflow.sdk.runners.worker:com.google.cloud.dataflow.sdk.runners.dataflow:com.google.cloud.dataflow.sdk.util:com.google.cloud.dataflow.sdk.runners.inprocess ${dataflow.javadoc_opts}</additionalparam>
+          <use>false</use>
+          <quiet>true</quiet>
+          <bottom><![CDATA[<br>]]></bottom>
+
+          <offlineLinks>
+            <offlineLink>
+              <url>https://developers.google.com/api-client-library/java/google-api-java-client/reference/1.20.0/</url>
+              <location>${basedir}/../../sdks/java/javadoc/apiclient-docs</location>
+            </offlineLink>
+            <offlineLink>
+              <url>http://avro.apache.org/docs/1.7.7/api/java/</url>
+              <location>${basedir}/../../sdks/java/javadoc/avro-docs</location>
+            </offlineLink>
+            <offlineLink>
+              <url>https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/java/latest/</url>
+              <location>${basedir}/../../sdks/java/javadoc/bq-docs</location>
+            </offlineLink>
+            <offlineLink>
+              <url>https://cloud.google.com/datastore/docs/apis/javadoc/</url>
+              <location>${basedir}/../../sdks/java/javadoc/datastore-docs</location>
+            </offlineLink>
+            <offlineLink>
+              <url>http://docs.guava-libraries.googlecode.com/git-history/release19/javadoc/</url>
+              <location>${basedir}/../../sdks/java/javadoc/guava-docs</location>
+            </offlineLink>
+            <offlineLink>
+              <url>http://hamcrest.org/JavaHamcrest/javadoc/1.3/</url>
+              <location>${basedir}/../../sdks/java/javadoc/hamcrest-docs</location>
+            </offlineLink>
+            <offlineLink>
+              <url>http://fasterxml.github.io/jackson-annotations/javadoc/2.7/</url>
+              <location>${basedir}/../../sdks/java/javadoc/jackson-annotations-docs</location>
+            </offlineLink>
+            <offlineLink>
+              <url>http://fasterxml.github.io/jackson-databind/javadoc/2.7/</url>
+              <location>${basedir}/../../sdks/java/javadoc/jackson-databind-docs</location>
+            </offlineLink>
+            <offlineLink>
+              <url>http://www.joda.org/joda-time/apidocs</url>
+              <location>${basedir}/../../sdks/java/javadoc/joda-docs</location>
+            </offlineLink>
+            <offlineLink>
+              <url>http://junit.sourceforge.net/javadoc/</url>
+              <location>${basedir}/../../sdks/java/javadoc/junit-docs</location>
+            </offlineLink>
+            <offlineLink>
+              <url>https://developers.google.com/api-client-library/java/google-oauth-java-client/reference/1.20.0/</url>
+              <location>${basedir}/../../sdks/java/javadoc/oauth-docs</location>
+            </offlineLink>
+          </offlineLinks>
+        </configuration>
+        <executions>
+          <execution>
+            <goals>
+              <goal>jar</goal>
+            </goals>
+            <phase>package</phase>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <version>2.4.1</version>
+        <executions>
+          <!-- In the first phase, we pick dependencies and relocate them. -->
+          <execution>
+            <id>bundle-and-repackage</id>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <shadeTestJar>true</shadeTestJar>
+              <artifactSet>
+                <includes>
+                  <include>com.google.cloud.bigtable:bigtable-client-core</include>
+                  <include>com.google.guava:guava</include>
+                </includes>
+              </artifactSet>
+              <filters>
+                <filter>
+                  <artifact>*:*</artifact>
+                  <excludes>
+                    <exclude>META-INF/*.SF</exclude>
+                    <exclude>META-INF/*.DSA</exclude>
+                    <exclude>META-INF/*.RSA</exclude>
+                  </excludes>
+                </filter>
+              </filters>
+              <relocations>
+                <!-- TODO: Once ready, change the following pattern to 'com'
+                     only, exclude 'com.google.cloud.dataflow.**', and remove
+                     the second relocation. -->
+                <relocation>
+                  <pattern>com.google.common</pattern>
+                  <shadedPattern>com.google.cloud.dataflow.sdk.repackaged.com.google.common</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>com.google.thirdparty</pattern>
+                  <shadedPattern>com.google.cloud.dataflow.sdk.repackaged.com.google.thirdparty</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>com.google.cloud.bigtable</pattern>
+                  <shadedPattern>com.google.cloud.dataflow.sdk.repackaged.com.google.cloud.bigtable</shadedPattern>
+                  <excludes>
+                    <exclude>com.google.cloud.bigtable.config.BigtableOptions*</exclude>
+                    <exclude>com.google.cloud.bigtable.config.CredentialOptions*</exclude>
+                    <exclude>com.google.cloud.bigtable.config.RetryOptions*</exclude>
+                    <exclude>com.google.cloud.bigtable.grpc.BigtableClusterName</exclude>
+                    <exclude>com.google.cloud.bigtable.grpc.BigtableTableName</exclude>
+                  </excludes>
+                </relocation>
+              </relocations>
+            </configuration>
+          </execution>
+
+          <!-- In the second phase, we pick remaining dependencies and bundle
+               them without repackaging. -->
+          <execution>
+            <id>bundle-rest-without-repackaging</id>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <shadeTestJar>true</shadeTestJar>
+              <finalName>${project.artifactId}-bundled-${project.version}</finalName>
+              <artifactSet>
+                <excludes>
+                  <exclude>com.google.cloud.bigtable:bigtable-client-core</exclude>
+                  <exclude>com.google.guava:guava</exclude>
+                </excludes>
+              </artifactSet>
+              <filters>
+                <filter>
+                  <artifact>*:*</artifact>
+                  <excludes>
+                    <exclude>META-INF/*.SF</exclude>
+                    <exclude>META-INF/*.DSA</exclude>
+                    <exclude>META-INF/*.RSA</exclude>
+                  </excludes>
+                </filter>
+              </filters>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <!-- Coverage analysis for unit tests. -->
+      <plugin>
+        <groupId>org.jacoco</groupId>
+        <artifactId>jacoco-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>java-sdk-all</artifactId>
+      <version>${beam.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.api-client</groupId>
+      <artifactId>google-api-client</artifactId>
+      <version>${google-clients.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava-jdk5</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.oauth-client</groupId>
+      <artifactId>google-oauth-client</artifactId>
+      <version>${google-clients.version}</version>
+      <exclusions>
+        <!-- Exclude an old version of guava that is being pulled
+             in by a transitive dependency of google-api-client -->
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava-jdk5</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.http-client</groupId>
+      <artifactId>google-http-client</artifactId>
+      <version>${google-clients.version}</version>
+      <exclusions>
+        <!-- Exclude an old version of guava that is being pulled
+             in by a transitive dependency of google-api-client -->
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava-jdk5</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.http-client</groupId>
+      <artifactId>google-http-client-jackson2</artifactId>
+      <version>${google-clients.version}</version>
+      <exclusions>
+        <!-- Exclude an old version of guava that is being pulled
+             in by a transitive dependency of google-api-client -->
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava-jdk5</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.http-client</groupId>
+      <artifactId>google-http-client-protobuf</artifactId>
+      <version>${google-clients.version}</version>
+      <exclusions>
+        <!-- Exclude an old version of guava that is being pulled
+             in by a transitive dependency of google-api-client -->
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava-jdk5</artifactId>
+        </exclusion>
+      </exclusions>
+      <scope>runtime</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.apis</groupId>
+      <artifactId>google-api-services-dataflow</artifactId>
+      <version>${dataflow.version}</version>
+      <exclusions>
+        <!-- Exclude an old version of guava that is being pulled
+             in by a transitive dependency of google-api-client -->
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava-jdk5</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.apis</groupId>
+      <artifactId>google-api-services-clouddebugger</artifactId>
+      <version>${clouddebugger.version}</version>
+      <exclusions>
+        <!-- Exclude an old version of guava that is being pulled
+             in by a transitive dependency of google-api-client -->
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava-jdk5</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.apis</groupId>
+      <artifactId>google-api-services-bigquery</artifactId>
+      <version>${bigquery.version}</version>
+      <exclusions>
+        <!-- Exclude an old version of guava that is being pulled
+             in by a transitive dependency of google-api-client -->
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava-jdk5</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.cloud.bigdataoss</groupId>
+      <artifactId>util</artifactId>
+      <version>1.4.3</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <!-- If updating version, please update the javadoc offlineLink -->
+      <version>${guava.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava-testlib</artifactId>
+      <version>${guava.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+      <version>${joda.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+      <version>${protobuf.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.code.findbugs</groupId>
+      <artifactId>jsr305</artifactId>
+      <version>${jsr305.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-core</artifactId>
+      <version>${jackson.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-annotations</artifactId>
+      <version>${jackson.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+      <version>${jackson.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <version>${slf4j.version}</version>
+    </dependency>
+
+    <!-- build dependencies -->
+    <dependency>
+      <groupId>com.google.auto.service</groupId>
+      <artifactId>auto-service</artifactId>
+      <version>1.0-rc2</version>
+      <optional>true</optional>
+    </dependency>
+
+    <!-- test dependencies -->
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <version>${hamcrest.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>${junit.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-jdk14</artifactId>
+      <version>${slf4j.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <version>1.10.19</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>java-sdk-all</artifactId>
+      <version>${beam.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.cloud.dataflow</groupId>
+      <artifactId>google-cloud-dataflow-java-proto-library-all</artifactId>
+      <version>0.5.160304</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/BlockingDataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/BlockingDataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/BlockingDataflowPipelineOptions.java
new file mode 100644
index 0000000..6bbafdd
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/BlockingDataflowPipelineOptions.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.options;
+
+import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import java.io.PrintStream;
+
+/**
+ * Options that are used to configure the {@link BlockingDataflowPipelineRunner}.
+ */
+@Description("Configure options on the BlockingDataflowPipelineRunner.")
+public interface BlockingDataflowPipelineOptions extends DataflowPipelineOptions {
+  /**
+   * Output stream for job status messages.
+   */
+  @Description("Where messages generated during execution of the Dataflow job will be output.")
+  @JsonIgnore
+  @Hidden
+  @Default.InstanceFactory(StandardOutputFactory.class)
+  PrintStream getJobMessageOutput();
+  void setJobMessageOutput(PrintStream value);
+
+  /**
+   * Returns a default of {@link System#out}.
+   */
+  public static class StandardOutputFactory implements DefaultValueFactory<PrintStream> {
+    @Override
+    public PrintStream create(PipelineOptions options) {
+      return System.out;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java
new file mode 100644
index 0000000..6f1551d
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.options;
+
+import com.google.api.services.clouddebugger.v2.model.Debuggee;
+import com.google.cloud.dataflow.sdk.annotations.Experimental;
+
+import javax.annotation.Nullable;
+
+/**
+ * Options for controlling Cloud Debugger.
+ */
+@Description("[Experimental] Used to configure the Cloud Debugger")
+@Experimental
+@Hidden
+public interface CloudDebuggerOptions {
+
+  /** Whether to enable the Cloud Debugger snapshot agent for the current job. */
+  @Description("Whether to enable the Cloud Debugger snapshot agent for the current job.")
+  boolean getEnableCloudDebugger();
+  void setEnableCloudDebugger(boolean enabled);
+
+  /** The Cloud Debugger debuggee to associate with. This should not be set directly. */
+  @Description("The Cloud Debugger debuggee to associate with. This should not be set directly.")
+  @Hidden
+  @Nullable Debuggee getDebuggee();
+  void setDebuggee(Debuggee debuggee);
+
+  /** The maximum cost (as a ratio of CPU time) allowed for evaluating conditional snapshots. */
+  @Description(
+      "The maximum cost (as a ratio of CPU time) allowed for evaluating conditional snapshots. "
+      + "Should be a double between 0 and 1. "
+      + "Snapshots will be cancelled if evaluating conditions takes more than this ratio of time.")
+  @Default.Double(0.01)
+  double getMaxConditionCost();
+  void setMaxConditionCost(double maxConditionCost);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java
new file mode 100644
index 0000000..6231bd4
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.options;
+
+import com.google.api.services.dataflow.Dataflow;
+import com.google.cloud.dataflow.sdk.annotations.Experimental;
+import com.google.cloud.dataflow.sdk.util.DataflowPathValidator;
+import com.google.cloud.dataflow.sdk.util.DataflowTransport;
+import com.google.cloud.dataflow.sdk.util.GcsStager;
+import com.google.cloud.dataflow.sdk.util.InstanceBuilder;
+import com.google.cloud.dataflow.sdk.util.PathValidator;
+import com.google.cloud.dataflow.sdk.util.Stager;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Internal. Options used to control execution of the Dataflow SDK for
+ * debugging and testing purposes.
+ */
+@Description("[Internal] Options used to control execution of the Dataflow SDK for "
+    + "debugging and testing purposes.")
+@Hidden
+public interface DataflowPipelineDebugOptions extends PipelineOptions {
+
+  /**
+   * The list of backend experiments to enable.
+   *
+   * <p>Dataflow provides a number of experimental features that can be enabled
+   * with this flag.
+   *
+   * <p>Please sync with the Dataflow team before enabling any experiments.
+   */
+  @Description("[Experimental] Dataflow provides a number of experimental features that can "
+      + "be enabled with this flag. Please sync with the Dataflow team before enabling any "
+      + "experiments.")
+  @Experimental
+  List<String> getExperiments();
+  void setExperiments(List<String> value);
+
+  /**
+   * The root URL for the Dataflow API. {@code dataflowEndpoint} can override this value
+   * if it contains an absolute URL, otherwise {@code apiRootUrl} will be combined with
+   * {@code dataflowEndpoint} to generate the full URL to communicate with the Dataflow API.
+   */
+  @Description("The root URL for the Dataflow API. dataflowEndpoint can override this "
+      + "value if it contains an absolute URL, otherwise apiRootUrl will be combined with "
+      + "dataflowEndpoint to generate the full URL to communicate with the Dataflow API.")
+  @Default.String(Dataflow.DEFAULT_ROOT_URL)
+  String getApiRootUrl();
+  void setApiRootUrl(String value);
+
+  /**
+   * Dataflow endpoint to use.
+   *
+   * <p>Defaults to the current version of the Google Cloud Dataflow
+   * API, at the time the current SDK version was released.
+   *
+   * <p>If the string contains "://", then this is treated as a URL,
+   * otherwise {@link #getApiRootUrl()} is used as the root
+   * URL.
+   */
+  @Description("The URL for the Dataflow API. If the string contains \"://\", this"
+      + " will be treated as the entire URL, otherwise will be treated relative to apiRootUrl.")
+  @Default.String(Dataflow.DEFAULT_SERVICE_PATH)
+  String getDataflowEndpoint();
+  void setDataflowEndpoint(String value);
+
+  /**
+   * The path to write the translated Dataflow job specification out to
+   * at job submission time. The Dataflow job specification will be represented in JSON
+   * format.
+   */
+  @Description("The path to write the translated Dataflow job specification out to "
+      + "at job submission time. The Dataflow job specification will be represented in JSON "
+      + "format.")
+  String getDataflowJobFile();
+  void setDataflowJobFile(String value);
+
+  /**
+   * The class of the validator that should be created and used to validate paths.
+   * If pathValidator has not been set explicitly, an instance of this class will be
+   * constructed and used as the path validator.
+   */
+  @Description("The class of the validator that should be created and used to validate paths. "
+      + "If pathValidator has not been set explicitly, an instance of this class will be "
+      + "constructed and used as the path validator.")
+  @Default.Class(DataflowPathValidator.class)
+  Class<? extends PathValidator> getPathValidatorClass();
+  void setPathValidatorClass(Class<? extends PathValidator> validatorClass);
+
+  /**
+   * The path validator instance that should be used to validate paths.
+   * If no path validator has been set explicitly, the default is to use the instance factory that
+   * constructs a path validator based upon the currently set pathValidatorClass.
+   */
+  @JsonIgnore
+  @Description("The path validator instance that should be used to validate paths. "
+      + "If no path validator has been set explicitly, the default is to use the instance factory "
+      + "that constructs a path validator based upon the currently set pathValidatorClass.")
+  @Default.InstanceFactory(PathValidatorFactory.class)
+  PathValidator getPathValidator();
+  void setPathValidator(PathValidator validator);
+
+  /**
+   * The class responsible for staging resources to be accessible by workers
+   * during job execution. If stager has not been set explicitly, an instance of this class
+   * will be created and used as the resource stager.
+   */
+  @Description("The class of the stager that should be created and used to stage resources. "
+      + "If stager has not been set explicitly, an instance of the this class will be created "
+      + "and used as the resource stager.")
+  @Default.Class(GcsStager.class)
+  Class<? extends Stager> getStagerClass();
+  void setStagerClass(Class<? extends Stager> stagerClass);
+
+  /**
+   * The resource stager instance that should be used to stage resources.
+   * If no stager has been set explicitly, the default is to use the instance factory
+   * that constructs a resource stager based upon the currently set stagerClass.
+   */
+  @JsonIgnore
+  @Description("The resource stager instance that should be used to stage resources. "
+      + "If no stager has been set explicitly, the default is to use the instance factory "
+      + "that constructs a resource stager based upon the currently set stagerClass.")
+  @Default.InstanceFactory(StagerFactory.class)
+  Stager getStager();
+  void setStager(Stager stager);
+
+  /**
+   * An instance of the Dataflow client. Defaults to creating a Dataflow client
+   * using the current set of options.
+   */
+  @JsonIgnore
+  @Description("An instance of the Dataflow client. Defaults to creating a Dataflow client "
+      + "using the current set of options.")
+  @Default.InstanceFactory(DataflowClientFactory.class)
+  Dataflow getDataflowClient();
+  void setDataflowClient(Dataflow value);
+
+  /** Returns the default Dataflow client built from the passed in PipelineOptions. */
+  public static class DataflowClientFactory implements DefaultValueFactory<Dataflow> {
+    @Override
+    public Dataflow create(PipelineOptions options) {
+        return DataflowTransport.newDataflowClient(
+            options.as(DataflowPipelineOptions.class)).build();
+    }
+  }
+
+  /**
+   * Whether to update the currently running pipeline with the same name as this one.
+   *
+   * @deprecated This property is replaced by {@link DataflowPipelineOptions#getUpdate()}
+   */
+  @Deprecated
+  @Description("If set, replace the existing pipeline with the name specified by --jobName with "
+      + "this pipeline, preserving state.")
+  boolean getUpdate();
+  @Deprecated
+  void setUpdate(boolean value);
+
+  /**
+   * Mapping of old PTranform names to new ones, specified as JSON
+   * <code>{"oldName":"newName",...}</code>. To mark a transform as deleted, make newName the
+   * empty string.
+   */
+  @JsonIgnore
+  @Description(
+      "Mapping of old PTranform names to new ones, specified as JSON "
+      + "{\"oldName\":\"newName\",...}. To mark a transform as deleted, make newName the empty "
+      + "string.")
+  Map<String, String> getTransformNameMapping();
+  void setTransformNameMapping(Map<String, String> value);
+
+  /**
+   * Custom windmill_main binary to use with the streaming runner.
+   */
+  @Description("Custom windmill_main binary to use with the streaming runner")
+  String getOverrideWindmillBinary();
+  void setOverrideWindmillBinary(String value);
+
+  /**
+   * Number of threads to use on the Dataflow worker harness. If left unspecified,
+   * the Dataflow service will compute an appropriate number of threads to use.
+   */
+  @Description("Number of threads to use on the Dataflow worker harness. If left unspecified, "
+      + "the Dataflow service will compute an appropriate number of threads to use.")
+  int getNumberOfWorkerHarnessThreads();
+  void setNumberOfWorkerHarnessThreads(int value);
+
+  /**
+   * If {@literal true}, save a heap dump before killing a thread or process which is GC
+   * thrashing or out of memory. The location of the heap file will either be echoed back
+   * to the user, or the user will be given the opportunity to download the heap file.
+   *
+   * <p>
+   * CAUTION: Heap dumps can of comparable size to the default boot disk. Consider increasing
+   * the boot disk size before setting this flag to true.
+   */
+  @Description("If {@literal true}, save a heap dump before killing a thread or process "
+      + "which is GC thrashing or out of memory.")
+  boolean getDumpHeapOnOOM();
+  void setDumpHeapOnOOM(boolean dumpHeapBeforeExit);
+
+  /**
+   * Creates a {@link PathValidator} object using the class specified in
+   * {@link #getPathValidatorClass()}.
+   */
+  public static class PathValidatorFactory implements DefaultValueFactory<PathValidator> {
+      @Override
+      public PathValidator create(PipelineOptions options) {
+      DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class);
+      return InstanceBuilder.ofType(PathValidator.class)
+          .fromClass(debugOptions.getPathValidatorClass())
+          .fromFactoryMethod("fromOptions")
+          .withArg(PipelineOptions.class, options)
+          .build();
+    }
+  }
+
+  /**
+   * Creates a {@link Stager} object using the class specified in
+   * {@link #getStagerClass()}.
+   */
+  public static class StagerFactory implements DefaultValueFactory<Stager> {
+      @Override
+      public Stager create(PipelineOptions options) {
+      DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class);
+      return InstanceBuilder.ofType(Stager.class)
+          .fromClass(debugOptions.getStagerClass())
+          .fromFactoryMethod("fromOptions")
+          .withArg(PipelineOptions.class, options)
+          .build();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java
new file mode 100644
index 0000000..dbfafd1
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.options;
+
+import com.google.cloud.dataflow.sdk.runners.DataflowPipeline;
+import com.google.common.base.MoreObjects;
+
+import org.joda.time.DateTimeUtils;
+import org.joda.time.DateTimeZone;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+/**
+ * Options that can be used to configure the {@link DataflowPipeline}.
+ */
+@Description("Options that configure the Dataflow pipeline.")
+public interface DataflowPipelineOptions extends
+    PipelineOptions, GcpOptions, ApplicationNameOptions, DataflowPipelineDebugOptions,
+    DataflowPipelineWorkerPoolOptions, BigQueryOptions,
+    GcsOptions, StreamingOptions, CloudDebuggerOptions, DataflowWorkerLoggingOptions,
+    DataflowProfilingOptions, PubsubOptions {
+
+  @Description("Project id. Required when running a Dataflow in the cloud. "
+      + "See https://cloud.google.com/storage/docs/projects for further details.")
+  @Override
+  @Validation.Required
+  @Default.InstanceFactory(DefaultProjectFactory.class)
+  String getProject();
+  @Override
+  void setProject(String value);
+
+  /**
+   * GCS path for staging local files, e.g. gs://bucket/object
+   *
+   * <p>Must be a valid Cloud Storage URL, beginning with the prefix "gs://"
+   *
+   * <p>At least one of {@link PipelineOptions#getTempLocation()} or {@link #getStagingLocation()}
+   * must be set. If {@link #getStagingLocation()} is not set, then the Dataflow
+   * pipeline defaults to using {@link PipelineOptions#getTempLocation()}.
+   */
+  @Description("GCS path for staging local files, e.g. \"gs://bucket/object\". "
+      + "Must be a valid Cloud Storage URL, beginning with the prefix \"gs://\". "
+      + "At least one of stagingLocation or tempLocation must be set. If stagingLocation is unset, "
+      + "defaults to using tempLocation.")
+  String getStagingLocation();
+  void setStagingLocation(String value);
+
+  /**
+   * The Dataflow job name is used as an idempotence key within the Dataflow service.
+   * If there is an existing job that is currently active, another active job with the same
+   * name will not be able to be created. Defaults to using the ApplicationName-UserName-Date.
+   */
+  @Description("The Dataflow job name is used as an idempotence key within the Dataflow service. "
+      + "If there is an existing job that is currently active, another active job with the same "
+      + "name will not be able to be created. Defaults to using the ApplicationName-UserName-Date.")
+  @Default.InstanceFactory(JobNameFactory.class)
+  String getJobName();
+  void setJobName(String value);
+
+  /**
+   * Whether to update the currently running pipeline with the same name as this one.
+   */
+  @Override
+  @SuppressWarnings("deprecation") // base class member deprecated in favor of this one.
+  @Description(
+      "If set, replace the existing pipeline with the name specified by --jobName with "
+          + "this pipeline, preserving state.")
+  boolean getUpdate();
+  @Override
+  @SuppressWarnings("deprecation") // base class member deprecated in favor of this one.
+  void setUpdate(boolean value);
+
+  /**
+   * Returns a normalized job name constructed from {@link ApplicationNameOptions#getAppName()}, the
+   * local system user name (if available), and the current time. The normalization makes sure that
+   * the job name matches the required pattern of [a-z]([-a-z0-9]*[a-z0-9])? and length limit of 40
+   * characters.
+   *
+   * <p>This job name factory is only able to generate one unique name per second per application
+   * and user combination.
+   */
+  public static class JobNameFactory implements DefaultValueFactory<String> {
+    private static final DateTimeFormatter FORMATTER =
+        DateTimeFormat.forPattern("MMddHHmmss").withZone(DateTimeZone.UTC);
+
+    @Override
+    public String create(PipelineOptions options) {
+      String appName = options.as(ApplicationNameOptions.class).getAppName();
+      String normalizedAppName = appName == null || appName.length() == 0 ? "dataflow"
+          : appName.toLowerCase()
+                   .replaceAll("[^a-z0-9]", "0")
+                   .replaceAll("^[^a-z]", "a");
+      String userName = MoreObjects.firstNonNull(System.getProperty("user.name"), "");
+      String normalizedUserName = userName.toLowerCase()
+                                          .replaceAll("[^a-z0-9]", "0");
+      String datePart = FORMATTER.print(DateTimeUtils.currentTimeMillis());
+      return normalizedAppName + "-" + normalizedUserName + "-" + datePart;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineWorkerPoolOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineWorkerPoolOptions.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineWorkerPoolOptions.java
new file mode 100644
index 0000000..0c6428f
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineWorkerPoolOptions.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.options;
+
+import com.google.cloud.dataflow.sdk.annotations.Experimental;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import java.util.List;
+
+/**
+ * Options that are used to configure the Dataflow pipeline worker pool.
+ */
+@Description("Options that are used to configure the Dataflow pipeline worker pool.")
+public interface DataflowPipelineWorkerPoolOptions extends PipelineOptions {
+  /**
+   * Number of workers to use when executing the Dataflow job. Note that selection of an autoscaling
+   * algorithm other then {@code NONE} will affect the size of the worker pool. If left unspecified,
+   * the Dataflow service will determine the number of workers.
+   */
+  @Description("Number of workers to use when executing the Dataflow job. Note that "
+      + "selection of an autoscaling algorithm other then \"NONE\" will affect the "
+      + "size of the worker pool. If left unspecified, the Dataflow service will "
+      + "determine the number of workers.")
+  int getNumWorkers();
+  void setNumWorkers(int value);
+
+  /**
+   * Type of autoscaling algorithm to use.
+   */
+  @Experimental(Experimental.Kind.AUTOSCALING)
+  public enum AutoscalingAlgorithmType {
+    /** Use numWorkers machines. Do not autoscale the worker pool. */
+    NONE("AUTOSCALING_ALGORITHM_NONE"),
+
+    @Deprecated
+    BASIC("AUTOSCALING_ALGORITHM_BASIC"),
+
+    /** Autoscale the workerpool based on throughput (up to maxNumWorkers). */
+    THROUGHPUT_BASED("AUTOSCALING_ALGORITHM_BASIC");
+
+    private final String algorithm;
+
+    private AutoscalingAlgorithmType(String algorithm) {
+      this.algorithm = algorithm;
+    }
+
+    /** Returns the string representation of this type. */
+    public String getAlgorithm() {
+      return this.algorithm;
+    }
+  }
+
+  /**
+   * [Experimental] The autoscaling algorithm to use for the workerpool.
+   *
+   * <ul>
+   *   <li>NONE: does not change the size of the worker pool.</li>
+   *   <li>BASIC: autoscale the worker pool size up to maxNumWorkers until the job completes.</li>
+   *   <li>THROUGHPUT_BASED: autoscale the workerpool based on throughput (up to maxNumWorkers).
+   *   </li>
+   * </ul>
+   */
+  @Description("[Experimental] The autoscaling algorithm to use for the workerpool. "
+      + "NONE: does not change the size of the worker pool. "
+      + "BASIC (deprecated): autoscale the worker pool size up to maxNumWorkers until the job "
+      + "completes. "
+      + "THROUGHPUT_BASED: autoscale the workerpool based on throughput (up to maxNumWorkers).")
+  @Experimental(Experimental.Kind.AUTOSCALING)
+  AutoscalingAlgorithmType getAutoscalingAlgorithm();
+  void setAutoscalingAlgorithm(AutoscalingAlgorithmType value);
+
+  /**
+   * The maximum number of workers to use for the workerpool. This options limits the size of the
+   * workerpool for the lifetime of the job, including
+   * <a href="https://cloud.google.com/dataflow/pipelines/updating-a-pipeline">pipeline updates</a>.
+   * If left unspecified, the Dataflow service will compute a ceiling.
+   */
+  @Description("The maximum number of workers to use for the workerpool. This options limits the "
+      + "size of the workerpool for the lifetime of the job, including pipeline updates. "
+      + "If left unspecified, the Dataflow service will compute a ceiling.")
+  int getMaxNumWorkers();
+  void setMaxNumWorkers(int value);
+
+  /**
+   * Remote worker disk size, in gigabytes, or 0 to use the default size.
+   */
+  @Description("Remote worker disk size, in gigabytes, or 0 to use the default size.")
+  int getDiskSizeGb();
+  void setDiskSizeGb(int value);
+
+  /**
+   * Docker container image that executes Dataflow worker harness, residing in Google Container
+   * Registry.
+   */
+  @Default.InstanceFactory(WorkerHarnessContainerImageFactory.class)
+  @Description("Docker container image that executes Dataflow worker harness, residing in Google "
+      + " Container Registry.")
+  @Hidden
+  String getWorkerHarnessContainerImage();
+  void setWorkerHarnessContainerImage(String value);
+
+  /**
+   * Returns the default Docker container image that executes Dataflow worker harness, residing in
+   * Google Container Registry.
+   */
+  public static class WorkerHarnessContainerImageFactory
+      implements DefaultValueFactory<String> {
+    @Override
+    public String create(PipelineOptions options) {
+      DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
+      if (dataflowOptions.isStreaming()) {
+        return DataflowPipelineRunner.STREAMING_WORKER_HARNESS_CONTAINER_IMAGE;
+      } else {
+        return DataflowPipelineRunner.BATCH_WORKER_HARNESS_CONTAINER_IMAGE;
+      }
+    }
+  }
+
+  /**
+   * GCE <a href="https://cloud.google.com/compute/docs/networking">network</a> for launching
+   * workers.
+   *
+   * <p>Default is up to the Dataflow service.
+   */
+  @Description("GCE network for launching workers. For more information, see the reference "
+      + "documentation https://cloud.google.com/compute/docs/networking. "
+      + "Default is up to the Dataflow service.")
+  String getNetwork();
+  void setNetwork(String value);
+
+  /**
+   * GCE <a href="https://cloud.google.com/compute/docs/networking">subnetwork</a> for launching
+   * workers.
+   *
+   * <p>Default is up to the Dataflow service. Expected format is
+   * regions/REGION/subnetworks/SUBNETWORK.
+   *
+   * <p>You may also need to specify network option.
+   */
+  @Description("GCE subnetwork for launching workers. For more information, see the reference "
+      + "documentation https://cloud.google.com/compute/docs/networking. "
+      + "Default is up to the Dataflow service.")
+  String getSubnetwork();
+  void setSubnetwork(String value);
+
+  /**
+   * GCE <a href="https://developers.google.com/compute/docs/zones"
+   * >availability zone</a> for launching workers.
+   *
+   * <p>Default is up to the Dataflow service.
+   */
+  @Description("GCE availability zone for launching workers. See "
+      + "https://developers.google.com/compute/docs/zones for a list of valid options. "
+      + "Default is up to the Dataflow service.")
+  String getZone();
+  void setZone(String value);
+
+  /**
+   * Machine type to create Dataflow worker VMs as.
+   *
+   * <p>See <a href="https://cloud.google.com/compute/docs/machine-types">GCE machine types</a>
+   * for a list of valid options.
+   *
+   * <p>If unset, the Dataflow service will choose a reasonable default.
+   */
+  @Description("Machine type to create Dataflow worker VMs as. See "
+      + "https://cloud.google.com/compute/docs/machine-types for a list of valid options. "
+      + "If unset, the Dataflow service will choose a reasonable default.")
+  String getWorkerMachineType();
+  void setWorkerMachineType(String value);
+
+  /**
+   * The policy for tearing down the workers spun up by the service.
+   */
+  public enum TeardownPolicy {
+    /**
+     * All VMs created for a Dataflow job are deleted when the job finishes, regardless of whether
+     * it fails or succeeds.
+     */
+    TEARDOWN_ALWAYS("TEARDOWN_ALWAYS"),
+    /**
+     * All VMs created for a Dataflow job are left running when the job finishes, regardless of
+     * whether it fails or succeeds.
+     */
+    TEARDOWN_NEVER("TEARDOWN_NEVER"),
+    /**
+     * All VMs created for a Dataflow job are deleted when the job succeeds, but are left running
+     * when it fails. (This is typically used for debugging failing jobs by SSHing into the
+     * workers.)
+     */
+    TEARDOWN_ON_SUCCESS("TEARDOWN_ON_SUCCESS");
+
+    private final String teardownPolicy;
+
+    private TeardownPolicy(String teardownPolicy) {
+      this.teardownPolicy = teardownPolicy;
+    }
+
+    public String getTeardownPolicyName() {
+      return this.teardownPolicy;
+    }
+  }
+
+  /**
+   * The teardown policy for the VMs.
+   *
+   * <p>If unset, the Dataflow service will choose a reasonable default.
+   */
+  @Description("The teardown policy for the VMs. If unset, the Dataflow service will "
+      + "choose a reasonable default.")
+  TeardownPolicy getTeardownPolicy();
+  void setTeardownPolicy(TeardownPolicy value);
+
+  /**
+   * List of local files to make available to workers.
+   *
+   * <p>Files are placed on the worker's classpath.
+   *
+   * <p>The default value is the list of jars from the main program's classpath.
+   */
+  @Description("Files to stage on GCS and make available to workers. "
+      + "Files are placed on the worker's classpath. "
+      + "The default value is all files from the classpath.")
+  @JsonIgnore
+  List<String> getFilesToStage();
+  void setFilesToStage(List<String> value);
+
+  /**
+   * Specifies what type of persistent disk should be used. The value should be a full or partial
+   * URL of a disk type resource, e.g., zones/us-central1-f/disks/pd-standard. For
+   * more information, see the
+   * <a href="https://cloud.google.com/compute/docs/reference/latest/diskTypes">API reference
+   * documentation for DiskTypes</a>.
+   */
+  @Description("Specifies what type of persistent disk should be used. The value should be a full "
+      + "or partial URL of a disk type resource, e.g., zones/us-central1-f/disks/pd-standard. For "
+      + "more information, see the API reference documentation for DiskTypes: "
+      + "https://cloud.google.com/compute/docs/reference/latest/diskTypes")
+  String getWorkerDiskType();
+  void setWorkerDiskType(String value);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowProfilingOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowProfilingOptions.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowProfilingOptions.java
new file mode 100644
index 0000000..f14b04d
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowProfilingOptions.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.options;
+
+import com.google.cloud.dataflow.sdk.annotations.Experimental;
+
+import java.util.HashMap;
+
+/**
+ * Options for controlling profiling of pipeline execution.
+ */
+@Description("[Experimental] Used to configure profiling of the Dataflow pipeline")
+@Experimental
+@Hidden
+public interface DataflowProfilingOptions {
+
+  @Description("Whether to periodically dump profiling information to local disk.\n"
+      + "WARNING: Enabling this option may fill local disk with profiling information.")
+  boolean getEnableProfilingAgent();
+  void setEnableProfilingAgent(boolean enabled);
+
+  @Description(
+      "[INTERNAL] Additional configuration for the profiling agent. Not typically necessary.")
+  @Hidden
+  DataflowProfilingAgentConfiguration getProfilingAgentConfiguration();
+  void setProfilingAgentConfiguration(DataflowProfilingAgentConfiguration configuration);
+
+  /**
+   * Configuration the for profiling agent.
+   */
+  public static class DataflowProfilingAgentConfiguration extends HashMap<String, Object> {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerHarnessOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerHarnessOptions.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerHarnessOptions.java
new file mode 100644
index 0000000..7705b66
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerHarnessOptions.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.options;
+
+/**
+ * Options that are used exclusively within the Dataflow worker harness.
+ * These options have no effect at pipeline creation time.
+ */
+@Description("[Internal] Options that are used exclusively within the Dataflow worker harness. "
+    + "These options have no effect at pipeline creation time.")
+@Hidden
+public interface DataflowWorkerHarnessOptions extends DataflowPipelineOptions {
+  /**
+   * The identity of the worker running this pipeline.
+   */
+  @Description("The identity of the worker running this pipeline.")
+  String getWorkerId();
+  void setWorkerId(String value);
+
+  /**
+   * The identity of the Dataflow job.
+   */
+  @Description("The identity of the Dataflow job.")
+  String getJobId();
+  void setJobId(String value);
+
+  /**
+   * The size of the worker's in-memory cache, in megabytes.
+   *
+   * <p>Currently, this cache is used for storing read values of side inputs.
+   */
+  @Description("The size of the worker's in-memory cache, in megabytes.")
+  @Default.Integer(100)
+  Integer getWorkerCacheMb();
+  void setWorkerCacheMb(Integer value);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerLoggingOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerLoggingOptions.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerLoggingOptions.java
new file mode 100644
index 0000000..ebd42d9
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerLoggingOptions.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.options;
+
+import com.google.common.base.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Options that are used to control logging configuration on the Dataflow worker.
+ */
+@Description("Options that are used to control logging configuration on the Dataflow worker.")
+public interface DataflowWorkerLoggingOptions extends PipelineOptions {
+  /**
+   * The set of log levels that can be used on the Dataflow worker.
+   */
+  public enum Level {
+    DEBUG, ERROR, INFO, TRACE, WARN
+  }
+
+  /**
+   * This option controls the default log level of all loggers without a log level override.
+   */
+  @Description("Controls the default log level of all loggers without a log level override.")
+  @Default.Enum("INFO")
+  Level getDefaultWorkerLogLevel();
+  void setDefaultWorkerLogLevel(Level level);
+
+  /**
+   * This option controls the log levels for specifically named loggers.
+   *
+   * <p>Later options with equivalent names override earlier options.
+   *
+   * <p>See {@link WorkerLogLevelOverrides} for more information on how to configure logging
+   * on a per {@link Class}, {@link Package}, or name basis. If used from the command line,
+   * the expected format is {"Name":"Level",...}, further details on
+   * {@link WorkerLogLevelOverrides#from}.
+   */
+  @Description("This option controls the log levels for specifically named loggers. "
+      + "The expected format is {\"Name\":\"Level\",...}. The Dataflow worker uses "
+      + "java.util.logging, which supports a logging hierarchy based off of names that are '.' "
+      + "separated. For example, by specifying the value {\"a.b.c.Foo\":\"DEBUG\"}, the logger "
+      + "for the class 'a.b.c.Foo' will be configured to output logs at the DEBUG level. "
+      + "Similarly, by specifying the value {\"a.b.c\":\"WARN\"}, all loggers underneath the "
+      + "'a.b.c' package will be configured to output logs at the WARN level. Also, note that "
+      + "when multiple overrides are specified, the exact name followed by the closest parent "
+      + "takes precedence.")
+  WorkerLogLevelOverrides getWorkerLogLevelOverrides();
+  void setWorkerLogLevelOverrides(WorkerLogLevelOverrides value);
+
+  /**
+   * Defines a log level override for a specific class, package, or name.
+   *
+   * <p>{@code java.util.logging} is used on the Dataflow worker harness and supports
+   * a logging hierarchy based off of names that are "." separated. It is a common
+   * pattern to have the logger for a given class share the same name as the class itself.
+   * Given the classes {@code a.b.c.Foo}, {@code a.b.c.Xyz}, and {@code a.b.Bar}, with
+   * loggers named {@code "a.b.c.Foo"}, {@code "a.b.c.Xyz"}, and {@code "a.b.Bar"} respectively,
+   * we can override the log levels:
+   * <ul>
+   *    <li>for {@code Foo} by specifying the name {@code "a.b.c.Foo"} or the {@link Class}
+   *    representing {@code a.b.c.Foo}.
+   *    <li>for {@code Foo}, {@code Xyz}, and {@code Bar} by specifying the name {@code "a.b"} or
+   *    the {@link Package} representing {@code a.b}.
+   *    <li>for {@code Foo} and {@code Bar} by specifying both of their names or classes.
+   * </ul>
+   * Note that by specifying multiple overrides, the exact name followed by the closest parent
+   * takes precedence.
+   */
+  public static class WorkerLogLevelOverrides extends HashMap<String, Level> {
+    /**
+     * Overrides the default log level for the passed in class.
+     *
+     * <p>This is equivalent to calling
+     * {@link #addOverrideForName(String, DataflowWorkerLoggingOptions.Level)}
+     * and passing in the {@link Class#getName() class name}.
+     */
+    public WorkerLogLevelOverrides addOverrideForClass(Class<?> klass, Level level) {
+      Preconditions.checkNotNull(klass, "Expected class to be not null.");
+      addOverrideForName(klass.getName(), level);
+      return this;
+    }
+
+    /**
+     * Overrides the default log level for the passed in package.
+     *
+     * <p>This is equivalent to calling
+     * {@link #addOverrideForName(String, DataflowWorkerLoggingOptions.Level)}
+     * and passing in the {@link Package#getName() package name}.
+     */
+    public WorkerLogLevelOverrides addOverrideForPackage(Package pkg, Level level) {
+      Preconditions.checkNotNull(pkg, "Expected package to be not null.");
+      addOverrideForName(pkg.getName(), level);
+      return this;
+    }
+
+    /**
+     * Overrides the default log level for the passed in name.
+     *
+     * <p>Note that because of the hierarchical nature of logger names, this will
+     * override the log level of all loggers that have the passed in name or
+     * a parent logger that has the passed in name.
+     */
+    public WorkerLogLevelOverrides addOverrideForName(String name, Level level) {
+      Preconditions.checkNotNull(name, "Expected name to be not null.");
+      Preconditions.checkNotNull(level,
+          "Expected level to be one of %s.", Arrays.toString(Level.values()));
+      put(name, level);
+      return this;
+    }
+
+    /**
+     * Expects a map keyed by logger {@code Name}s with values representing {@code Level}s.
+     * The {@code Name} generally represents the fully qualified Java
+     * {@link Class#getName() class name}, or fully qualified Java
+     * {@link Package#getName() package name}, or custom logger name. The {@code Level}
+     * represents the log level and must be one of {@link Level}.
+     */
+    @JsonCreator
+    public static WorkerLogLevelOverrides from(Map<String, String> values) {
+      Preconditions.checkNotNull(values, "Expected values to be not null.");
+      WorkerLogLevelOverrides overrides = new WorkerLogLevelOverrides();
+      for (Map.Entry<String, String> entry : values.entrySet()) {
+        try {
+          overrides.addOverrideForName(entry.getKey(), Level.valueOf(entry.getValue()));
+        } catch (IllegalArgumentException e) {
+          throw new IllegalArgumentException(String.format(
+              "Unsupported log level '%s' requested for %s. Must be one of %s.",
+              entry.getValue(), entry.getKey(), Arrays.toString(Level.values())));
+        }
+
+      }
+      return overrides;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunner.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunner.java
new file mode 100644
index 0000000..8903181
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunner.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.runners;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.PipelineResult.State;
+import com.google.cloud.dataflow.sdk.annotations.Experimental;
+import com.google.cloud.dataflow.sdk.options.BlockingDataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsValidator;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.util.MonitoringUtil;
+import com.google.cloud.dataflow.sdk.values.PInput;
+import com.google.cloud.dataflow.sdk.values.POutput;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@link PipelineRunner} that's like {@link DataflowPipelineRunner}
+ * but that waits for the launched job to finish.
+ *
+ * <p>Prints out job status updates and console messages while it waits.
+ *
+ * <p>Returns the final job state, or throws an exception if the job
+ * fails or cannot be monitored.
+ *
+ * <p><h3>Permissions</h3>
+ * When reading from a Dataflow source or writing to a Dataflow sink using
+ * {@code BlockingDataflowPipelineRunner}, the Google cloud services account and the Google compute
+ * engine service account of the GCP project running the Dataflow Job will need access to the
+ * corresponding source/sink.
+ *
+ * <p>Please see <a href="https://cloud.google.com/dataflow/security-and-permissions">Google Cloud
+ * Dataflow Security and Permissions</a> for more details.
+ */
+public class BlockingDataflowPipelineRunner extends
+    PipelineRunner<DataflowPipelineJob> {
+  private static final Logger LOG = LoggerFactory.getLogger(BlockingDataflowPipelineRunner.class);
+
+  // Defaults to an infinite wait period.
+  // TODO: make this configurable after removal of option map.
+  private static final long BUILTIN_JOB_TIMEOUT_SEC = -1L;
+
+  private final DataflowPipelineRunner dataflowPipelineRunner;
+  private final BlockingDataflowPipelineOptions options;
+
+  protected BlockingDataflowPipelineRunner(
+      DataflowPipelineRunner internalRunner,
+      BlockingDataflowPipelineOptions options) {
+    this.dataflowPipelineRunner = internalRunner;
+    this.options = options;
+  }
+
+  /**
+   * Constructs a runner from the provided options.
+   */
+  public static BlockingDataflowPipelineRunner fromOptions(
+      PipelineOptions options) {
+    BlockingDataflowPipelineOptions dataflowOptions =
+        PipelineOptionsValidator.validate(BlockingDataflowPipelineOptions.class, options);
+    DataflowPipelineRunner dataflowPipelineRunner =
+        DataflowPipelineRunner.fromOptions(dataflowOptions);
+
+    return new BlockingDataflowPipelineRunner(dataflowPipelineRunner, dataflowOptions);
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @throws DataflowJobExecutionException if there is an exception during job execution.
+   * @throws DataflowServiceException if there is an exception retrieving information about the job.
+   */
+  @Override
+  public DataflowPipelineJob run(Pipeline p) {
+    final DataflowPipelineJob job = dataflowPipelineRunner.run(p);
+
+    // We ignore the potential race condition here (Ctrl-C after job submission but before the
+    // shutdown hook is registered). Even if we tried to do something smarter (eg., SettableFuture)
+    // the run method (which produces the job) could fail or be Ctrl-C'd before it had returned a
+    // job. The display of the command to cancel the job is best-effort anyways -- RPC's could fail,
+    // etc. If the user wants to verify the job was cancelled they should look at the job status.
+    Thread shutdownHook = new Thread() {
+      @Override
+      public void run() {
+        LOG.warn("Job is already running in Google Cloud Platform, Ctrl-C will not cancel it.\n"
+            + "To cancel the job in the cloud, run:\n> {}",
+            MonitoringUtil.getGcloudCancelCommand(options, job.getJobId()));
+      }
+    };
+
+    try {
+      Runtime.getRuntime().addShutdownHook(shutdownHook);
+
+      @Nullable
+      State result;
+      try {
+        result = job.waitToFinish(
+            BUILTIN_JOB_TIMEOUT_SEC, TimeUnit.SECONDS,
+            new MonitoringUtil.PrintHandler(options.getJobMessageOutput()));
+      } catch (IOException | InterruptedException ex) {
+        if (ex instanceof InterruptedException) {
+          Thread.currentThread().interrupt();
+        }
+        LOG.debug("Exception caught while retrieving status for job {}", job.getJobId(), ex);
+        throw new DataflowServiceException(
+            job, "Exception caught while retrieving status for job " + job.getJobId(), ex);
+      }
+
+      if (result == null) {
+        throw new DataflowServiceException(
+            job, "Timed out while retrieving status for job " + job.getJobId());
+      }
+
+      LOG.info("Job finished with status {}", result);
+      if (!result.isTerminal()) {
+        throw new IllegalStateException("Expected terminal state for job " + job.getJobId()
+            + ", got " + result);
+      }
+
+      if (result == State.DONE) {
+        return job;
+      } else if (result == State.UPDATED) {
+        DataflowPipelineJob newJob = job.getReplacedByJob();
+        LOG.info("Job {} has been updated and is running as the new job with id {}."
+            + "To access the updated job on the Dataflow monitoring console, please navigate to {}",
+            job.getJobId(),
+            newJob.getJobId(),
+            MonitoringUtil.getJobMonitoringPageURL(newJob.getProjectId(), newJob.getJobId()));
+        throw new DataflowJobUpdatedException(
+            job,
+            String.format("Job %s updated; new job is %s.", job.getJobId(), newJob.getJobId()),
+            newJob);
+      } else if (result == State.CANCELLED) {
+        String message = String.format("Job %s cancelled by user", job.getJobId());
+        LOG.info(message);
+        throw new DataflowJobCancelledException(job, message);
+      } else {
+        throw new DataflowJobExecutionException(job, "Job " + job.getJobId()
+            + " failed with status " + result);
+      }
+    } finally {
+      Runtime.getRuntime().removeShutdownHook(shutdownHook);
+    }
+  }
+
+  @Override
+  public <OutputT extends POutput, InputT extends PInput> OutputT apply(
+      PTransform<InputT, OutputT> transform, InputT input) {
+    return dataflowPipelineRunner.apply(transform, input);
+  }
+
+  /**
+   * Sets callbacks to invoke during execution. See {@link DataflowPipelineRunnerHooks}.
+   */
+  @Experimental
+  public void setHooks(DataflowPipelineRunnerHooks hooks) {
+    this.dataflowPipelineRunner.setHooks(hooks);
+  }
+
+  @Override
+  public String toString() {
+    return "BlockingDataflowPipelineRunner#" + options.getJobName();
+  }
+}



Mime
View raw message