beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [17/21] incubator-beam git commit: Reorganize Java packages in the sources of the Google Cloud Dataflow runner
Date Wed, 27 Apr 2016 01:08:49 GMT
Reorganize Java packages in the sources of the Google Cloud Dataflow runner

Packages are moving from org.apache.beam.sdk to org.apache.beam.runners.dataflow.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/02190985
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/02190985
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/02190985

Branch: refs/heads/master
Commit: 021909855fcf6729ce6ccb9b9ff76f1ca5af35db
Parents: 9e19efd
Author: Davor Bonaci <davor@google.com>
Authored: Mon Apr 25 14:16:03 2016 -0700
Committer: Davor Bonaci <davor@google.com>
Committed: Tue Apr 26 17:59:39 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/examples/MinimalWordCount.java  |    4 +-
 .../org/apache/beam/examples/WordCount.java     |    2 +-
 .../examples/common/DataflowExampleOptions.java |    2 +-
 .../examples/common/DataflowExampleUtils.java   |    8 +-
 .../common/ExampleBigQueryTableOptions.java     |    2 +-
 ...xamplePubsubTopicAndSubscriptionOptions.java |    2 +-
 .../common/ExamplePubsubTopicOptions.java       |    2 +-
 .../examples/common/PubsubFileInjector.java     |    2 +-
 .../beam/examples/complete/AutoComplete.java    |    2 +-
 .../examples/complete/StreamingWordExtract.java |    2 +-
 .../examples/complete/TopWikipediaSessions.java |    2 +-
 .../beam/examples/cookbook/DeDupExample.java    |    2 +-
 .../beam/examples/cookbook/TriggerExample.java  |    4 +-
 .../org/apache/beam/examples/WordCountIT.java   |    4 +-
 .../beam/examples/MinimalWordCountJava8.java    |    4 +-
 .../beam/examples/complete/game/GameStats.java  |    2 +-
 .../examples/complete/game/LeaderBoard.java     |    2 +-
 .../beam/runners/flink/examples/TFIDF.java      |    1 +
 .../beam/runners/flink/examples/WordCount.java  |    9 +-
 .../flink/examples/streaming/AutoComplete.java  |   21 +-
 .../flink/examples/streaming/JoinExamples.java  |    6 +-
 .../KafkaWindowedWordCountExample.java          |   11 +-
 .../examples/streaming/WindowedWordCount.java   |   13 +-
 .../runners/flink/FlinkPipelineOptions.java     |    2 +-
 .../beam/runners/flink/FlinkPipelineRunner.java |    4 +-
 .../FlinkBatchPipelineTranslator.java           |    2 +-
 .../FlinkStreamingPipelineTranslator.java       |    2 +-
 runners/google-cloud-dataflow-java/pom.xml      |    4 +-
 .../BlockingDataflowPipelineRunner.java         |  186 ++
 .../DataflowJobAlreadyExistsException.java      |   35 +
 .../DataflowJobAlreadyUpdatedException.java     |   34 +
 .../dataflow/DataflowJobCancelledException.java |   39 +
 .../runners/dataflow/DataflowJobException.java  |   41 +
 .../dataflow/DataflowJobExecutionException.java |   35 +
 .../dataflow/DataflowJobUpdatedException.java   |   51 +
 .../runners/dataflow/DataflowPipelineJob.java   |  397 +++
 .../dataflow/DataflowPipelineRegistrar.java     |   62 +
 .../dataflow/DataflowPipelineRunner.java        | 3025 ++++++++++++++++++
 .../dataflow/DataflowPipelineRunnerHooks.java   |   39 +
 .../dataflow/DataflowPipelineTranslator.java    | 1059 ++++++
 .../dataflow/DataflowServiceException.java      |   33 +
 .../dataflow/internal/AssignWindows.java        |   89 +
 .../dataflow/internal/BigQueryIOTranslator.java |   72 +
 .../dataflow/internal/CustomSources.java        |  121 +
 .../internal/DataflowAggregatorTransforms.java  |   81 +
 .../internal/DataflowMetricUpdateExtractor.java |  111 +
 .../dataflow/internal/PubsubIOTranslator.java   |  108 +
 .../dataflow/internal/ReadTranslator.java       |  105 +
 .../runners/dataflow/internal/package-info.java |   21 +
 .../BlockingDataflowPipelineOptions.java        |   55 +
 .../dataflow/options/CloudDebuggerOptions.java  |   56 +
 .../options/DataflowPipelineDebugOptions.java   |  247 ++
 .../options/DataflowPipelineOptions.java        |  126 +
 .../DataflowPipelineWorkerPoolOptions.java      |  263 ++
 .../options/DataflowProfilingOptions.java       |   50 +
 .../options/DataflowWorkerHarnessOptions.java   |   55 +
 .../options/DataflowWorkerLoggingOptions.java   |  159 +
 .../testing/TestDataflowPipelineOptions.java    |   30 +
 .../testing/TestDataflowPipelineRunner.java     |  273 ++
 .../dataflow/util/DataflowPathValidator.java    |  100 +
 .../dataflow/util/DataflowTransport.java        |  114 +
 .../beam/runners/dataflow/util/GcsStager.java   |   55 +
 .../runners/dataflow/util/MonitoringUtil.java   |  237 ++
 .../beam/runners/dataflow/util/PackageUtil.java |  333 ++
 .../beam/runners/dataflow/util/Stager.java      |   30 +
 .../BlockingDataflowPipelineOptions.java        |   50 -
 .../beam/sdk/options/CloudDebuggerOptions.java  |   53 -
 .../options/DataflowPipelineDebugOptions.java   |  242 --
 .../sdk/options/DataflowPipelineOptions.java    |  115 -
 .../DataflowPipelineWorkerPoolOptions.java      |  258 --
 .../sdk/options/DataflowProfilingOptions.java   |   48 -
 .../options/DataflowWorkerHarnessOptions.java   |   51 -
 .../options/DataflowWorkerLoggingOptions.java   |  155 -
 .../runners/BlockingDataflowPipelineRunner.java |  185 --
 .../DataflowJobAlreadyExistsException.java      |   35 -
 .../DataflowJobAlreadyUpdatedException.java     |   34 -
 .../runners/DataflowJobCancelledException.java  |   39 -
 .../beam/sdk/runners/DataflowJobException.java  |   41 -
 .../runners/DataflowJobExecutionException.java  |   35 -
 .../runners/DataflowJobUpdatedException.java    |   51 -
 .../beam/sdk/runners/DataflowPipelineJob.java   |  395 ---
 .../sdk/runners/DataflowPipelineRegistrar.java  |   60 -
 .../sdk/runners/DataflowPipelineRunner.java     | 3022 -----------------
 .../runners/DataflowPipelineRunnerHooks.java    |   39 -
 .../sdk/runners/DataflowPipelineTranslator.java | 1058 ------
 .../sdk/runners/DataflowServiceException.java   |   33 -
 .../sdk/runners/dataflow/AssignWindows.java     |   89 -
 .../runners/dataflow/BigQueryIOTranslator.java  |   72 -
 .../sdk/runners/dataflow/CustomSources.java     |  121 -
 .../dataflow/DataflowAggregatorTransforms.java  |   81 -
 .../dataflow/DataflowMetricUpdateExtractor.java |  111 -
 .../runners/dataflow/PubsubIOTranslator.java    |  108 -
 .../sdk/runners/dataflow/ReadTranslator.java    |  105 -
 .../beam/sdk/runners/dataflow/package-info.java |   21 -
 .../testing/TestDataflowPipelineOptions.java    |   28 -
 .../sdk/testing/TestDataflowPipelineRunner.java |  271 --
 .../beam/sdk/util/DataflowPathValidator.java    |   99 -
 .../apache/beam/sdk/util/DataflowTransport.java |  113 -
 .../org/apache/beam/sdk/util/GcsStager.java     |   55 -
 .../apache/beam/sdk/util/MonitoringUtil.java    |  236 --
 .../org/apache/beam/sdk/util/PackageUtil.java   |  328 --
 .../java/org/apache/beam/sdk/util/Stager.java   |   30 -
 .../BlockingDataflowPipelineRunnerTest.java     |  302 ++
 .../dataflow/DataflowPipelineJobTest.java       |  608 ++++
 .../dataflow/DataflowPipelineRegistrarTest.java |   75 +
 .../dataflow/DataflowPipelineRunnerTest.java    | 1401 ++++++++
 .../DataflowPipelineTranslatorTest.java         |  967 ++++++
 .../dataflow/internal/CustomSourcesTest.java    |  276 ++
 .../runners/dataflow/io/DataflowTextIOTest.java |  119 +
 .../DataflowPipelineDebugOptionsTest.java       |   43 +
 .../options/DataflowPipelineOptionsTest.java    |   93 +
 .../options/DataflowProfilingOptionsTest.java   |   51 +
 .../DataflowWorkerLoggingOptionsTest.java       |   77 +
 .../testing/TestDataflowPipelineRunnerTest.java |  381 +++
 .../transforms/DataflowGroupByKeyTest.java      |  113 +
 .../dataflow/transforms/DataflowViewTest.java   |  208 ++
 .../util/DataflowPathValidatorTest.java         |   94 +
 .../dataflow/util/MonitoringUtilTest.java       |  151 +
 .../runners/dataflow/util/PackageUtilTest.java  |  486 +++
 .../apache/beam/sdk/io/DataflowTextIOTest.java  |  118 -
 .../DataflowPipelineDebugOptionsTest.java       |   41 -
 .../options/DataflowPipelineOptionsTest.java    |   92 -
 .../options/DataflowProfilingOptionsTest.java   |   49 -
 .../DataflowWorkerLoggingOptionsTest.java       |   77 -
 .../BlockingDataflowPipelineRunnerTest.java     |  302 --
 .../sdk/runners/DataflowPipelineJobTest.java    |  606 ----
 .../runners/DataflowPipelineRegistrarTest.java  |   74 -
 .../sdk/runners/DataflowPipelineRunnerTest.java | 1400 --------
 .../runners/DataflowPipelineTranslatorTest.java |  965 ------
 .../sdk/runners/dataflow/CustomSourcesTest.java |  276 --
 .../testing/TestDataflowPipelineRunnerTest.java |  379 ---
 .../sdk/transforms/DataflowGroupByKeyTest.java  |  110 -
 .../beam/sdk/transforms/DataflowViewTest.java   |  205 --
 .../sdk/util/DataflowPathValidatorTest.java     |   92 -
 .../beam/sdk/util/MonitoringUtilTest.java       |  149 -
 .../apache/beam/sdk/util/PackageUtilTest.java   |  484 ---
 .../apache/beam/sdk/testing/TestPipeline.java   |    2 +-
 .../src/main/java/MinimalWordCount.java         |    4 +-
 .../src/main/java/WindowedWordCount.java        |   10 +-
 .../src/main/java/WordCount.java                |    2 +-
 .../java/common/DataflowExampleOptions.java     |    2 +-
 .../main/java/common/DataflowExampleUtils.java  |    9 +-
 .../common/ExampleBigQueryTableOptions.java     |    5 +-
 .../java/common/ExamplePubsubTopicOptions.java  |    2 +-
 .../main/java/common/PubsubFileInjector.java    |    9 +-
 145 files changed, 13479 insertions(+), 13347 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
index 6c33ca6..ec517c4 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
@@ -17,11 +17,11 @@
  */
 package org.apache.beam.examples;
 
+import org.apache.beam.runners.dataflow.BlockingDataflowPipelineRunner;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.BlockingDataflowPipelineRunner;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
index 9fb9856..364d075 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
@@ -17,9 +17,9 @@
  */
 package org.apache.beam.examples;
 
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.Description;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleOptions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleOptions.java b/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleOptions.java
index 5c62146..2e8ef3d 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleOptions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleOptions.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.examples.common;
 
-import org.apache.beam.sdk.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java b/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java
index 93c2358..7ac71d3 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java
@@ -17,18 +17,18 @@
  */
 package org.apache.beam.examples.common;
 
+import org.apache.beam.runners.dataflow.DataflowPipelineJob;
+import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.util.MonitoringUtil;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.BigQueryOptions;
-import org.apache.beam.sdk.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.runners.DataflowPipelineJob;
-import org.apache.beam.sdk.runners.DataflowPipelineRunner;
 import org.apache.beam.sdk.runners.DirectPipelineRunner;
 import org.apache.beam.sdk.transforms.IntraBundleParallelization;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
-import org.apache.beam.sdk.util.MonitoringUtil;
 import org.apache.beam.sdk.util.Transport;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/examples/java/src/main/java/org/apache/beam/examples/common/ExampleBigQueryTableOptions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleBigQueryTableOptions.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleBigQueryTableOptions.java
index 647d508..36304a8 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleBigQueryTableOptions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleBigQueryTableOptions.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.examples.common;
 
-import org.apache.beam.sdk.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.Description;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java
index b3da88d..22bcf4e 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.examples.common;
 
-import org.apache.beam.sdk.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.Description;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicOptions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicOptions.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicOptions.java
index 17f5b28..603e309 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicOptions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicOptions.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.examples.common;
 
-import org.apache.beam.sdk.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.Description;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/examples/java/src/main/java/org/apache/beam/examples/common/PubsubFileInjector.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/PubsubFileInjector.java b/examples/java/src/main/java/org/apache/beam/examples/common/PubsubFileInjector.java
index b71b6f3..82d58b6 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/PubsubFileInjector.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/PubsubFileInjector.java
@@ -17,9 +17,9 @@
  */
 package org.apache.beam.examples.common;
 
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
index e1b658b..2f2283c 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
@@ -20,6 +20,7 @@ package org.apache.beam.examples.complete;
 import org.apache.beam.examples.common.DataflowExampleUtils;
 import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
 import org.apache.beam.examples.common.ExamplePubsubTopicOptions;
+import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.coders.AvroCoder;
@@ -31,7 +32,6 @@ import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.DataflowPipelineRunner;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Filter;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
index 7107e34..5de08da 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
@@ -20,6 +20,7 @@ package org.apache.beam.examples.complete;
 import org.apache.beam.examples.common.DataflowExampleUtils;
 import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
 import org.apache.beam.examples.common.ExamplePubsubTopicOptions;
+import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.io.BigQueryIO;
@@ -27,7 +28,6 @@ import org.apache.beam.sdk.io.PubsubIO;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.DataflowPipelineRunner;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
index fa587ea..e7090fd 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
@@ -17,10 +17,10 @@
  */
 package org.apache.beam.examples.complete;
 
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.TableRowJsonCoder;
 import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java
index 1d34245..fe2bbc8 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java
@@ -17,9 +17,9 @@
  */
 package org.apache.beam.examples.cookbook;
 
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.Description;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
index 72976e3..28885a7 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
@@ -22,16 +22,16 @@ import org.apache.beam.examples.common.DataflowExampleUtils;
 import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
 import org.apache.beam.examples.common.ExamplePubsubTopicOptions;
 import org.apache.beam.examples.common.PubsubFileInjector;
+import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.io.BigQueryIO;
 import org.apache.beam.sdk.io.PubsubIO;
 import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.DataflowPipelineRunner;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.RequiresWindowAccess;
 import org.apache.beam.sdk.transforms.GroupByKey;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
index a5ad707..56ca98c 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
@@ -22,10 +22,10 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
 import org.apache.beam.examples.WordCount.WordCountOptions;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineRunner;
 import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.TestDataflowPipelineRunner;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestPipelineOptions;
 import com.google.common.base.Joiner;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
index 493344e..398d517 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
@@ -17,11 +17,11 @@
  */
 package org.apache.beam.examples;
 
+import org.apache.beam.runners.dataflow.BlockingDataflowPipelineRunner;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.BlockingDataflowPipelineRunner;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Filter;
 import org.apache.beam.sdk.transforms.FlatMapElements;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index 2d14264..12dfdf9 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -19,13 +19,13 @@ package org.apache.beam.examples.complete.game;
 
 import org.apache.beam.examples.common.DataflowExampleUtils;
 import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery;
+import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.io.PubsubIO;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.DataflowPipelineRunner;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
index 97958b0..a5d9fb9 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
@@ -21,6 +21,7 @@ import org.apache.beam.examples.common.DataflowExampleOptions;
 import org.apache.beam.examples.common.DataflowExampleUtils;
 import org.apache.beam.examples.complete.game.utils.WriteToBigQuery;
 import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery;
+import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.io.PubsubIO;
@@ -28,7 +29,6 @@ import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.Validation;
-import org.apache.beam.sdk.runners.DataflowPipelineRunner;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
index 3eb1327..0afde0a 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
@@ -53,6 +53,7 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.TupleTag;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
index 04e11c1..702fb63 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
@@ -25,7 +25,14 @@ import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.*;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
index 2bc3490..9d1168b 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
@@ -22,17 +22,32 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.io.Unbounded
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.DefaultCoder;
-import org.apache.beam.sdk.io.*;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.*;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Filter;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Partition;
 import org.apache.beam.sdk.transforms.Partition.PartitionFn;
-import org.apache.beam.sdk.transforms.windowing.*;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.Top;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
+
 import org.joda.time.Duration;
 
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
index 8c8490b..d3e963d 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
@@ -29,11 +29,15 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.join.CoGbkResult;
 import org.apache.beam.sdk.transforms.join.CoGroupByKey;
 import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
-import org.apache.beam.sdk.transforms.windowing.*;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TupleTag;
+
 import org.joda.time.Duration;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
index 95210ad..abb9fea 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
@@ -25,10 +25,17 @@ import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.*;
-import org.apache.beam.sdk.transforms.windowing.*;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.joda.time.Duration;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
index 9f77f8c..e803e6e 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
@@ -20,12 +20,19 @@ package org.apache.beam.runners.flink.examples.streaming;
 import org.apache.beam.runners.flink.FlinkPipelineRunner;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSocketSource;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.*;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.*;
-import org.apache.beam.sdk.transforms.windowing.*;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index bfb0d6a..8c82abd 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -18,8 +18,8 @@
 package org.apache.beam.runners.flink;
 
 
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.options.ApplicationNameOptions;
-import org.apache.beam.sdk.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
index bb016fa..a389d7a 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
@@ -17,11 +17,11 @@
  */
 package org.apache.beam.runners.flink;
 
+import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
-import org.apache.beam.sdk.runners.DataflowPipelineRunner;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PInput;
@@ -46,7 +46,7 @@ import java.util.Map;
  * pipeline by first translating them to a Flink Plan and then executing them either locally
  * or on a Flink cluster, depending on the configuration.
  * <p>
- * This is based on {@link org.apache.beam.sdk.runners.DataflowPipelineRunner}.
+ * This is based on {@link org.apache.beam.runners.dataflow.DataflowPipelineRunner}.
  */
 public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> {
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
index 5ce828c..456cf09 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
@@ -30,7 +30,7 @@ import org.slf4j.LoggerFactory;
 
 /**
  * FlinkBatchPipelineTranslator knows how to translate Pipeline objects into Flink Jobs.
- * This is based on {@link org.apache.beam.sdk.runners.DataflowPipelineTranslator}
+ * This is based on {@link org.apache.beam.runners.dataflow.DataflowPipelineTranslator}
  */
 public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
index 4359842..ebaf6ba 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
@@ -32,7 +32,7 @@ import org.slf4j.LoggerFactory;
  * {@link org.apache.beam.sdk.values.PCollection}-based job into a
  * {@link org.apache.flink.streaming.api.datastream.DataStream} one.
  *
- * This is based on {@link org.apache.beam.sdk.runners.DataflowPipelineTranslator}
+ * This is based on {@link org.apache.beam.runners.dataflow.DataflowPipelineTranslator}
  * */
 public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index 7893975..beb340c 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -177,8 +177,8 @@
           <windowtitle>Google Cloud Dataflow Java Runner ${project.version}</windowtitle>
           <doctitle>Google Cloud Dataflow Runner for Java, version ${project.version}</doctitle>
 
-          <subpackages>org.apache.beam.sdk</subpackages>
-          <additionalparam>-exclude org.apache.beam.sdk.runners.worker:org.apache.beam.sdk.runners.dataflow:org.apache.beam.sdk.util:org.apache.beam.sdk.runners.inprocess ${dataflow.javadoc_opts}</additionalparam>
+          <subpackages>org.apache.beam.runners.dataflow</subpackages>
+          <additionalparam>-exclude org.apache.beam.sdk.runners.dataflow.internal:org.apache.beam.sdk.runners.dataflow.testing:org.apache.beam.sdk.runners.dataflow.util ${dataflow.javadoc_opts}</additionalparam>
           <use>false</use>
           <quiet>true</quiet>
           <bottom><![CDATA[<br>]]></bottom>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BlockingDataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BlockingDataflowPipelineRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BlockingDataflowPipelineRunner.java
new file mode 100644
index 0000000..d8ee16a
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BlockingDataflowPipelineRunner.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import org.apache.beam.runners.dataflow.options.BlockingDataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.util.MonitoringUtil;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult.State;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@link PipelineRunner} that's like {@link DataflowPipelineRunner}
+ * but that waits for the launched job to finish.
+ *
+ * <p>Prints out job status updates and console messages while it waits.
+ *
+ * <p>Returns the final job state, or throws an exception if the job
+ * fails or cannot be monitored.
+ *
+ * <p><h3>Permissions</h3>
+ * When reading from a Dataflow source or writing to a Dataflow sink using
+ * {@code BlockingDataflowPipelineRunner}, the Google cloud services account and the Google compute
+ * engine service account of the GCP project running the Dataflow Job will need access to the
+ * corresponding source/sink.
+ *
+ * <p>Please see <a href="https://cloud.google.com/dataflow/security-and-permissions">Google Cloud
+ * Dataflow Security and Permissions</a> for more details.
+ */
+public class BlockingDataflowPipelineRunner extends
+    PipelineRunner<DataflowPipelineJob> {
+  private static final Logger LOG = LoggerFactory.getLogger(BlockingDataflowPipelineRunner.class);
+
+  // Defaults to an infinite wait period.
+  // TODO: make this configurable after removal of option map.
+  private static final long BUILTIN_JOB_TIMEOUT_SEC = -1L;
+
+  private final DataflowPipelineRunner dataflowPipelineRunner;
+  private final BlockingDataflowPipelineOptions options;
+
+  protected BlockingDataflowPipelineRunner(
+      DataflowPipelineRunner internalRunner,
+      BlockingDataflowPipelineOptions options) {
+    this.dataflowPipelineRunner = internalRunner;
+    this.options = options;
+  }
+
+  /**
+   * Constructs a runner from the provided options.
+   */
+  public static BlockingDataflowPipelineRunner fromOptions(
+      PipelineOptions options) {
+    BlockingDataflowPipelineOptions dataflowOptions =
+        PipelineOptionsValidator.validate(BlockingDataflowPipelineOptions.class, options);
+    DataflowPipelineRunner dataflowPipelineRunner =
+        DataflowPipelineRunner.fromOptions(dataflowOptions);
+
+    return new BlockingDataflowPipelineRunner(dataflowPipelineRunner, dataflowOptions);
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @throws DataflowJobExecutionException if there is an exception during job execution.
+   * @throws DataflowServiceException if there is an exception retrieving information about the job.
+   */
+  @Override
+  public DataflowPipelineJob run(Pipeline p) {
+    final DataflowPipelineJob job = dataflowPipelineRunner.run(p);
+
+    // We ignore the potential race condition here (Ctrl-C after job submission but before the
+    // shutdown hook is registered). Even if we tried to do something smarter (eg., SettableFuture)
+    // the run method (which produces the job) could fail or be Ctrl-C'd before it had returned a
+    // job. The display of the command to cancel the job is best-effort anyways -- RPC's could fail,
+    // etc. If the user wants to verify the job was cancelled they should look at the job status.
+    Thread shutdownHook = new Thread() {
+      @Override
+      public void run() {
+        LOG.warn("Job is already running in Google Cloud Platform, Ctrl-C will not cancel it.\n"
+            + "To cancel the job in the cloud, run:\n> {}",
+            MonitoringUtil.getGcloudCancelCommand(options, job.getJobId()));
+      }
+    };
+
+    try {
+      Runtime.getRuntime().addShutdownHook(shutdownHook);
+
+      @Nullable
+      State result;
+      try {
+        result = job.waitToFinish(
+            BUILTIN_JOB_TIMEOUT_SEC, TimeUnit.SECONDS,
+            new MonitoringUtil.PrintHandler(options.getJobMessageOutput()));
+      } catch (IOException | InterruptedException ex) {
+        if (ex instanceof InterruptedException) {
+          Thread.currentThread().interrupt();
+        }
+        LOG.debug("Exception caught while retrieving status for job {}", job.getJobId(), ex);
+        throw new DataflowServiceException(
+            job, "Exception caught while retrieving status for job " + job.getJobId(), ex);
+      }
+
+      if (result == null) {
+        throw new DataflowServiceException(
+            job, "Timed out while retrieving status for job " + job.getJobId());
+      }
+
+      LOG.info("Job finished with status {}", result);
+      if (!result.isTerminal()) {
+        throw new IllegalStateException("Expected terminal state for job " + job.getJobId()
+            + ", got " + result);
+      }
+
+      if (result == State.DONE) {
+        return job;
+      } else if (result == State.UPDATED) {
+        DataflowPipelineJob newJob = job.getReplacedByJob();
+        LOG.info("Job {} has been updated and is running as the new job with id {}."
+            + "To access the updated job on the Dataflow monitoring console, please navigate to {}",
+            job.getJobId(),
+            newJob.getJobId(),
+            MonitoringUtil.getJobMonitoringPageURL(newJob.getProjectId(), newJob.getJobId()));
+        throw new DataflowJobUpdatedException(
+            job,
+            String.format("Job %s updated; new job is %s.", job.getJobId(), newJob.getJobId()),
+            newJob);
+      } else if (result == State.CANCELLED) {
+        String message = String.format("Job %s cancelled by user", job.getJobId());
+        LOG.info(message);
+        throw new DataflowJobCancelledException(job, message);
+      } else {
+        throw new DataflowJobExecutionException(job, "Job " + job.getJobId()
+            + " failed with status " + result);
+      }
+    } finally {
+      Runtime.getRuntime().removeShutdownHook(shutdownHook);
+    }
+  }
+
+  @Override
+  public <OutputT extends POutput, InputT extends PInput> OutputT apply(
+      PTransform<InputT, OutputT> transform, InputT input) {
+    return dataflowPipelineRunner.apply(transform, input);
+  }
+
+  /**
+   * Sets callbacks to invoke during execution. See {@link DataflowPipelineRunnerHooks}.
+   */
+  @Experimental
+  public void setHooks(DataflowPipelineRunnerHooks hooks) {
+    this.dataflowPipelineRunner.setHooks(hooks);
+  }
+
+  @Override
+  public String toString() {
+    return "BlockingDataflowPipelineRunner#" + options.getJobName();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobAlreadyExistsException.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobAlreadyExistsException.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobAlreadyExistsException.java
new file mode 100644
index 0000000..2b73bf7
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobAlreadyExistsException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+/**
+ * An exception that is thrown if the unique job name constraint of the Dataflow
+ * service is broken because an existing job with the same job name is currently active.
+ * The {@link DataflowPipelineJob} contained within this exception contains information
+ * about the pre-existing job.
+ */
+public class DataflowJobAlreadyExistsException extends DataflowJobException {
+  /**
+   * Create a new {@code DataflowJobAlreadyExistsException} with the specified {@link
+   * DataflowPipelineJob} and message.
+   */
+  public DataflowJobAlreadyExistsException(
+      DataflowPipelineJob job, String message) {
+    super(job, message, null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobAlreadyUpdatedException.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobAlreadyUpdatedException.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobAlreadyUpdatedException.java
new file mode 100644
index 0000000..be11637
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobAlreadyUpdatedException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+/**
+ * An exception that is thrown if the existing job has already been updated within the Dataflow
+ * service and is no longer able to be updated. The {@link DataflowPipelineJob} contained within
+ * this exception contains information about the pre-existing updated job.
+ */
+public class DataflowJobAlreadyUpdatedException extends DataflowJobException {
+  /**
+   * Create a new {@code DataflowJobAlreadyUpdatedException} with the specified {@link
+   * DataflowPipelineJob} and message.
+   */
+  public DataflowJobAlreadyUpdatedException(
+      DataflowPipelineJob job, String message) {
+    super(job, message, null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobCancelledException.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobCancelledException.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobCancelledException.java
new file mode 100644
index 0000000..de44a19
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobCancelledException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+/**
+ * Signals that a job run by a {@link BlockingDataflowPipelineRunner} was updated during execution.
+ */
+public class DataflowJobCancelledException extends DataflowJobException {
+  /**
+   * Create a new {@code DataflowJobAlreadyUpdatedException} with the specified {@link
+   * DataflowPipelineJob} and message.
+   */
+  public DataflowJobCancelledException(DataflowPipelineJob job, String message) {
+    super(job, message, null);
+  }
+
+  /**
+   * Create a new {@code DataflowJobAlreadyUpdatedException} with the specified {@link
+   * DataflowPipelineJob}, message, and cause.
+   */
+  public DataflowJobCancelledException(DataflowPipelineJob job, String message, Throwable cause) {
+    super(job, message, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobException.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobException.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobException.java
new file mode 100644
index 0000000..74c0f80
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import java.util.Objects;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@link RuntimeException} that contains information about a {@link DataflowPipelineJob}.
+ */
+public abstract class DataflowJobException extends RuntimeException {
+  private final DataflowPipelineJob job;
+
+  DataflowJobException(DataflowPipelineJob job, String message, @Nullable Throwable cause) {
+    super(message, cause);
+    this.job = Objects.requireNonNull(job);
+  }
+
+  /**
+   * Returns the failed job.
+   */
+  public DataflowPipelineJob getJob() {
+    return job;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobExecutionException.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobExecutionException.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobExecutionException.java
new file mode 100644
index 0000000..11b8723
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobExecutionException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import javax.annotation.Nullable;
+
+/**
+ * Signals that a job run by a {@link BlockingDataflowPipelineRunner} fails during execution, and
+ * provides access to the failed job.
+ */
+public class DataflowJobExecutionException extends DataflowJobException {
+  DataflowJobExecutionException(DataflowPipelineJob job, String message) {
+    this(job, message, null);
+  }
+
+  DataflowJobExecutionException(
+      DataflowPipelineJob job, String message, @Nullable Throwable cause) {
+    super(job, message, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobUpdatedException.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobUpdatedException.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobUpdatedException.java
new file mode 100644
index 0000000..e30a0e7
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobUpdatedException.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+/**
+ * Signals that a job run by a {@link BlockingDataflowPipelineRunner} was updated during execution.
+ */
+public class DataflowJobUpdatedException extends DataflowJobException {
+  private DataflowPipelineJob replacedByJob;
+
+  /**
+   * Create a new {@code DataflowJobUpdatedException} with the specified original {@link
+   * DataflowPipelineJob}, message, and replacement {@link DataflowPipelineJob}.
+   */
+  public DataflowJobUpdatedException(
+      DataflowPipelineJob job, String message, DataflowPipelineJob replacedByJob) {
+    this(job, message, replacedByJob, null);
+  }
+
+  /**
+   * Create a new {@code DataflowJobUpdatedException} with the specified original {@link
+   * DataflowPipelineJob}, message, replacement {@link DataflowPipelineJob}, and cause.
+   */
+  public DataflowJobUpdatedException(
+      DataflowPipelineJob job, String message, DataflowPipelineJob replacedByJob, Throwable cause) {
+    super(job, message, cause);
+    this.replacedByJob = replacedByJob;
+  }
+
+  /**
+   * The new job that replaces the job terminated with this exception.
+   */
+  public DataflowPipelineJob getReplacedByJob() {
+    return replacedByJob;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
new file mode 100644
index 0000000..19df0a1
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import static org.apache.beam.sdk.util.TimeUtil.fromCloudTime;
+
+import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms;
+import org.apache.beam.runners.dataflow.internal.DataflowMetricUpdateExtractor;
+import org.apache.beam.runners.dataflow.util.MonitoringUtil;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.runners.AggregatorRetrievalException;
+import org.apache.beam.sdk.runners.AggregatorValues;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.util.AttemptAndTimeBoundedExponentialBackOff;
+import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
+import org.apache.beam.sdk.util.MapAggregatorValues;
+
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.NanoClock;
+import com.google.api.client.util.Sleeper;
+import com.google.api.services.dataflow.Dataflow;
+import com.google.api.services.dataflow.model.Job;
+import com.google.api.services.dataflow.model.JobMessage;
+import com.google.api.services.dataflow.model.JobMetrics;
+import com.google.api.services.dataflow.model.MetricUpdate;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Throwables;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nullable;
+
+/**
+ * A DataflowPipelineJob represents a job submitted to Dataflow using
+ * {@link DataflowPipelineRunner}.
+ */
+public class DataflowPipelineJob implements PipelineResult {
+  private static final Logger LOG = LoggerFactory.getLogger(DataflowPipelineJob.class);
+
+  /**
+   * The id for the job.
+   */
+  private String jobId;
+
+  /**
+   * Google cloud project to associate this pipeline with.
+   */
+  private String projectId;
+
+  /**
+   * Client for the Dataflow service. This can be used to query the service
+   * for information about the job.
+   */
+  private Dataflow dataflowClient;
+
+  /**
+   * The state the job terminated in or {@code null} if the job has not terminated.
+   */
+  @Nullable
+  private State terminalState = null;
+
+  /**
+   * The job that replaced this one or {@code null} if the job has not been replaced.
+   */
+  @Nullable
+  private DataflowPipelineJob replacedByJob = null;
+
+  private DataflowAggregatorTransforms aggregatorTransforms;
+
+  /**
+   * The Metric Updates retrieved after the job was in a terminal state.
+   */
+  private List<MetricUpdate> terminalMetricUpdates;
+
+  /**
+   * The polling interval for job status and messages information.
+   */
+  static final long MESSAGES_POLLING_INTERVAL = TimeUnit.SECONDS.toMillis(2);
+  static final long STATUS_POLLING_INTERVAL = TimeUnit.SECONDS.toMillis(2);
+
+  /**
+   * The amount of polling attempts for job status and messages information.
+   */
+  static final int MESSAGES_POLLING_ATTEMPTS = 10;
+  static final int STATUS_POLLING_ATTEMPTS = 5;
+
+  /**
+   * Constructs the job.
+   *
+   * @param projectId the project id
+   * @param jobId the job id
+   * @param dataflowClient the client for the Dataflow Service
+   */
+  public DataflowPipelineJob(String projectId, String jobId, Dataflow dataflowClient,
+      DataflowAggregatorTransforms aggregatorTransforms) {
+    this.projectId = projectId;
+    this.jobId = jobId;
+    this.dataflowClient = dataflowClient;
+    this.aggregatorTransforms = aggregatorTransforms;
+  }
+
+  /**
+   * Get the id of this job.
+   */
+  public String getJobId() {
+    return jobId;
+  }
+
+  /**
+   * Get the project this job exists in.
+   */
+  public String getProjectId() {
+    return projectId;
+  }
+
+  /**
+   * Returns a new {@link DataflowPipelineJob} for the job that replaced this one, if applicable.
+   *
+   * @throws IllegalStateException if called before the job has terminated or if the job terminated
+   * but was not updated
+   */
+  public DataflowPipelineJob getReplacedByJob() {
+    if (terminalState == null) {
+      throw new IllegalStateException("getReplacedByJob() called before job terminated");
+    }
+    if (replacedByJob == null) {
+      throw new IllegalStateException("getReplacedByJob() called for job that was not replaced");
+    }
+    return replacedByJob;
+  }
+
+  /**
+   * Get the Cloud Dataflow API Client used by this job.
+   */
+  public Dataflow getDataflowClient() {
+    return dataflowClient;
+  }
+
+  /**
+   * Waits for the job to finish and return the final status.
+   *
+   * @param timeToWait The time to wait in units timeUnit for the job to finish.
+   *     Provide a value less than 1 ms for an infinite wait.
+   * @param timeUnit The unit of time for timeToWait.
+   * @param messageHandler If non null this handler will be invoked for each
+   *   batch of messages received.
+   * @return The final state of the job or null on timeout or if the
+   *   thread is interrupted.
+   * @throws IOException If there is a persistent problem getting job
+   *   information.
+   * @throws InterruptedException
+   */
+  @Nullable
+  public State waitToFinish(
+      long timeToWait,
+      TimeUnit timeUnit,
+      MonitoringUtil.JobMessagesHandler messageHandler)
+          throws IOException, InterruptedException {
+    return waitToFinish(timeToWait, timeUnit, messageHandler, Sleeper.DEFAULT, NanoClock.SYSTEM);
+  }
+
+  /**
+   * Wait for the job to finish and return the final status.
+   *
+   * @param timeToWait The time to wait in units timeUnit for the job to finish.
+   *     Provide a value less than 1 ms for an infinite wait.
+   * @param timeUnit The unit of time for timeToWait.
+   * @param messageHandler If non null this handler will be invoked for each
+   *   batch of messages received.
+   * @param sleeper A sleeper to use to sleep between attempts.
+   * @param nanoClock A nanoClock used to time the total time taken.
+   * @return The final state of the job or null on timeout or if the
+   *   thread is interrupted.
+   * @throws IOException If there is a persistent problem getting job
+   *   information.
+   * @throws InterruptedException
+   */
+  @Nullable
+  @VisibleForTesting
+  State waitToFinish(
+      long timeToWait,
+      TimeUnit timeUnit,
+      MonitoringUtil.JobMessagesHandler messageHandler,
+      Sleeper sleeper,
+      NanoClock nanoClock)
+          throws IOException, InterruptedException {
+    MonitoringUtil monitor = new MonitoringUtil(projectId, dataflowClient);
+
+    long lastTimestamp = 0;
+    BackOff backoff =
+        timeUnit.toMillis(timeToWait) > 0
+            ? new AttemptAndTimeBoundedExponentialBackOff(
+                MESSAGES_POLLING_ATTEMPTS,
+                MESSAGES_POLLING_INTERVAL,
+                timeUnit.toMillis(timeToWait),
+                AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ATTEMPTS,
+                nanoClock)
+            : new AttemptBoundedExponentialBackOff(
+                MESSAGES_POLLING_ATTEMPTS, MESSAGES_POLLING_INTERVAL);
+    State state;
+    do {
+      // Get the state of the job before listing messages. This ensures we always fetch job
+      // messages after the job finishes to ensure we have all them.
+      state = getStateWithRetries(1, sleeper);
+      boolean hasError = state == State.UNKNOWN;
+
+      if (messageHandler != null && !hasError) {
+        // Process all the job messages that have accumulated so far.
+        try {
+          List<JobMessage> allMessages = monitor.getJobMessages(
+              jobId, lastTimestamp);
+
+          if (!allMessages.isEmpty()) {
+            lastTimestamp =
+                fromCloudTime(allMessages.get(allMessages.size() - 1).getTime()).getMillis();
+            messageHandler.process(allMessages);
+          }
+        } catch (GoogleJsonResponseException | SocketTimeoutException e) {
+          hasError = true;
+          LOG.warn("There were problems getting current job messages: {}.", e.getMessage());
+          LOG.debug("Exception information:", e);
+        }
+      }
+
+      if (!hasError) {
+        backoff.reset();
+        // Check if the job is done.
+        if (state.isTerminal()) {
+          return state;
+        }
+      }
+    } while(BackOffUtils.next(sleeper, backoff));
+    LOG.warn("No terminal state was returned.  State value {}", state);
+    return null;  // Timed out.
+  }
+
+  /**
+   * Cancels the job.
+   * @throws IOException if there is a problem executing the cancel request.
+   */
+  public void cancel() throws IOException {
+    Job content = new Job();
+    content.setProjectId(projectId);
+    content.setId(jobId);
+    content.setRequestedState("JOB_STATE_CANCELLED");
+    dataflowClient.projects().jobs()
+        .update(projectId, jobId, content)
+        .execute();
+  }
+
+  @Override
+  public State getState() {
+    if (terminalState != null) {
+      return terminalState;
+    }
+
+    return getStateWithRetries(STATUS_POLLING_ATTEMPTS, Sleeper.DEFAULT);
+  }
+
+  /**
+   * Attempts to get the state. Uses exponential backoff on failure up to the maximum number
+   * of passed in attempts.
+   *
+   * @param attempts The amount of attempts to make.
+   * @param sleeper Object used to do the sleeps between attempts.
+   * @return The state of the job or State.UNKNOWN in case of failure.
+   */
+  @VisibleForTesting
+  State getStateWithRetries(int attempts, Sleeper sleeper) {
+    if (terminalState != null) {
+      return terminalState;
+    }
+    try {
+      Job job = getJobWithRetries(attempts, sleeper);
+      return MonitoringUtil.toState(job.getCurrentState());
+    } catch (IOException exn) {
+      // The only IOException that getJobWithRetries is permitted to throw is the final IOException
+      // that caused the failure of retry. Other exceptions are wrapped in an unchecked exceptions
+      // and will propagate.
+      return State.UNKNOWN;
+    }
+  }
+
+  /**
+   * Attempts to get the underlying {@link Job}. Uses exponential backoff on failure up to the
+   * maximum number of passed in attempts.
+   *
+   * @param attempts The amount of attempts to make.
+   * @param sleeper Object used to do the sleeps between attempts.
+   * @return The underlying {@link Job} object.
+   * @throws IOException When the maximum number of retries is exhausted, the last exception is
+   * thrown.
+   */
+  @VisibleForTesting
+  Job getJobWithRetries(int attempts, Sleeper sleeper) throws IOException {
+    AttemptBoundedExponentialBackOff backoff =
+        new AttemptBoundedExponentialBackOff(attempts, STATUS_POLLING_INTERVAL);
+
+    // Retry loop ends in return or throw
+    while (true) {
+      try {
+        Job job = dataflowClient
+            .projects()
+            .jobs()
+            .get(projectId, jobId)
+            .execute();
+        State currentState = MonitoringUtil.toState(job.getCurrentState());
+        if (currentState.isTerminal()) {
+          terminalState = currentState;
+          replacedByJob = new DataflowPipelineJob(
+              getProjectId(), job.getReplacedByJobId(), dataflowClient, aggregatorTransforms);
+        }
+        return job;
+      } catch (IOException exn) {
+        LOG.warn("There were problems getting current job status: {}.", exn.getMessage());
+        LOG.debug("Exception information:", exn);
+
+        if (!nextBackOff(sleeper, backoff)) {
+          throw exn;
+        }
+      }
+    }
+  }
+
+  /**
+   * Identical to {@link BackOffUtils#next} but without checked exceptions.
+   */
+  private boolean nextBackOff(Sleeper sleeper, BackOff backoff) {
+    try {
+      return BackOffUtils.next(sleeper, backoff);
+    } catch (InterruptedException | IOException e) {
+      if (e instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public <OutputT> AggregatorValues<OutputT> getAggregatorValues(Aggregator<?, OutputT> aggregator)
+      throws AggregatorRetrievalException {
+    try {
+      return new MapAggregatorValues<>(fromMetricUpdates(aggregator));
+    } catch (IOException e) {
+      throw new AggregatorRetrievalException(
+          "IOException when retrieving Aggregator values for Aggregator " + aggregator, e);
+    }
+  }
+
+  private <OutputT> Map<String, OutputT> fromMetricUpdates(Aggregator<?, OutputT> aggregator)
+      throws IOException {
+    if (aggregatorTransforms.contains(aggregator)) {
+      List<MetricUpdate> metricUpdates;
+      if (terminalMetricUpdates != null) {
+        metricUpdates = terminalMetricUpdates;
+      } else {
+        boolean terminal = getState().isTerminal();
+        JobMetrics jobMetrics =
+            dataflowClient.projects().jobs().getMetrics(projectId, jobId).execute();
+        metricUpdates = jobMetrics.getMetrics();
+        if (terminal && jobMetrics.getMetrics() != null) {
+          terminalMetricUpdates = metricUpdates;
+        }
+      }
+
+      return DataflowMetricUpdateExtractor.fromMetricUpdates(
+          aggregator, aggregatorTransforms, metricUpdates);
+    } else {
+      throw new IllegalArgumentException(
+          "Aggregator " + aggregator + " is not used in this pipeline");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java
new file mode 100644
index 0000000..d7d243f
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import org.apache.beam.runners.dataflow.options.BlockingDataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Contains the {@link PipelineOptionsRegistrar} and {@link PipelineRunnerRegistrar} for the
+ * {@link DataflowPipelineRunner}.
+ */
+public class DataflowPipelineRegistrar {
+  private DataflowPipelineRegistrar() { }
+
+  /**
+   * Register the {@link DataflowPipelineOptions} and {@link BlockingDataflowPipelineOptions}.
+   */
+  @AutoService(PipelineOptionsRegistrar.class)
+  public static class Options implements PipelineOptionsRegistrar {
+    @Override
+    public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+      return ImmutableList.<Class<? extends PipelineOptions>>of(
+          DataflowPipelineOptions.class,
+          BlockingDataflowPipelineOptions.class);
+    }
+  }
+
+  /**
+   * Register the {@link DataflowPipelineRunner} and {@link BlockingDataflowPipelineRunner}.
+   */
+  @AutoService(PipelineRunnerRegistrar.class)
+  public static class Runner implements PipelineRunnerRegistrar {
+    @Override
+    public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
+      return ImmutableList.<Class<? extends PipelineRunner<?>>>of(
+          DataflowPipelineRunner.class,
+          BlockingDataflowPipelineRunner.class);
+    }
+  }
+}


Mime
View raw message