Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8AFCA200C2F for ; Mon, 6 Mar 2017 21:49:52 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 89411160B81; Mon, 6 Mar 2017 20:49:52 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5CE8E160B76 for ; Mon, 6 Mar 2017 21:49:51 +0100 (CET) Received: (qmail 90652 invoked by uid 500); 6 Mar 2017 20:49:50 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 90341 invoked by uid 99); 6 Mar 2017 20:49:50 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Mar 2017 20:49:50 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 08AEFDFBD9; Mon, 6 Mar 2017 20:49:50 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kenn@apache.org To: commits@beam.apache.org Date: Mon, 06 Mar 2017 20:49:50 -0000 Message-Id: <0f11c01084874a5c91d344161785a749@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] beam git commit: Revert "DataflowRunner: experimental support for issuing FnAPI based jobs" archived-at: Mon, 06 Mar 2017 20:49:52 -0000 Repository: beam Updated Branches: refs/heads/master 626bc38aa -> 5d120bd3f Revert "DataflowRunner: experimental support for issuing FnAPI based jobs" This reverts commit 131c9f916dae6345ec77a869112ae5901b568f23. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b31be1ca Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b31be1ca Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b31be1ca Branch: refs/heads/master Commit: b31be1caef6a691006082e3e440d0b42ff1d4165 Parents: 2f96bc3 Author: Kenneth Knowles Authored: Sun Mar 5 20:03:18 2017 -0800 Committer: Kenneth Knowles Committed: Sun Mar 5 20:03:18 2017 -0800 ---------------------------------------------------------------------- runners/google-cloud-dataflow-java/pom.xml | 3 +- .../dataflow/DataflowPipelineTranslator.java | 3 +- .../beam/runners/dataflow/DataflowRunner.java | 46 ++++++++------------ .../runners/dataflow/DataflowRunnerInfo.java | 38 ++++++++-------- .../options/DataflowPipelineDebugOptions.java | 2 - .../DataflowPipelineWorkerPoolOptions.java | 10 ++--- .../beam/runners/dataflow/dataflow.properties | 8 ++-- .../dataflow/DataflowRunnerInfoTest.java | 23 +++++----- .../runners/dataflow/DataflowRunnerTest.java | 17 -------- 9 files changed, 58 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/b31be1ca/runners/google-cloud-dataflow-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml index fb06797..fdd088f 100644 --- a/runners/google-cloud-dataflow-java/pom.xml +++ b/runners/google-cloud-dataflow-java/pom.xml @@ -34,8 +34,7 @@ beam-master-20170228 - 1 - 6 + 6 http://git-wip-us.apache.org/repos/asf/beam/blob/b31be1ca/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index ab4cb9c..7a78a4c 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -326,7 +326,8 @@ public class DataflowPipelineTranslator { workerPool.setNumWorkers(options.getNumWorkers()); if (options.isStreaming() - && !DataflowRunner.hasExperiment(options, "enable_windmill_service")) { + && (options.getExperiments() == null + || !options.getExperiments().contains("enable_windmill_service"))) { // Use separate data disk for streaming. Disk disk = new Disk(); disk.setDiskType(options.getWorkerDiskType()); http://git-wip-us.apache.org/repos/asf/beam/blob/b31be1ca/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index c609b54..15147f1 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -51,6 +51,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -303,12 +304,14 @@ public class DataflowRunner extends PipelineRunner { PTransformMatchers.parDoWithFnType(unsupported), UnsupportedOverrideFactory.withMessage(getUnsupportedMessage(unsupported, true))); } - if (!hasExperiment(options, "enable_custom_pubsub_source")) { + if (options.getExperiments() == null + || !options.getExperiments().contains("enable_custom_pubsub_source")) { ptoverrides.put( PTransformMatchers.classEqualTo(PubsubUnboundedSource.class), new ReflectiveRootOverrideFactory(StreamingPubsubIORead.class, this)); } - if (!hasExperiment(options, "enable_custom_pubsub_sink")) { + if (options.getExperiments() == null + || !options.getExperiments().contains("enable_custom_pubsub_sink")) { ptoverrides.put( PTransformMatchers.classEqualTo(PubsubUnboundedSink.class), new StreamingPubsubIOWriteOverrideFactory(this)); @@ -532,7 +535,20 @@ public class DataflowRunner extends PipelineRunner { workerPool.setWorkerHarnessContainerImage(workerHarnessContainerImage); } - newJob.getEnvironment().setVersion(getEnvironmentVersion(options)); + // Requirements about the service. + Map environmentVersion = new HashMap<>(); + environmentVersion.put( + PropertyNames.ENVIRONMENT_VERSION_MAJOR_KEY, + DataflowRunnerInfo.getDataflowRunnerInfo().getEnvironmentMajorVersion()); + newJob.getEnvironment().setVersion(environmentVersion); + // Default jobType is JAVA_BATCH_AUTOSCALING: A Java job with workers that the job can + // autoscale if specified. + String jobType = "JAVA_BATCH_AUTOSCALING"; + + if (options.isStreaming()) { + jobType = "STREAMING"; + } + environmentVersion.put(PropertyNames.ENVIRONMENT_VERSION_JOB_TYPE_KEY, jobType); if (hooks != null) { hooks.modifyEnvironmentBeforeSubmission(newJob.getEnvironment()); @@ -640,30 +656,6 @@ public class DataflowRunner extends PipelineRunner { return dataflowPipelineJob; } - /** Returns true if the specified experiment is enabled, handling null experiments. */ - public static boolean hasExperiment(DataflowPipelineDebugOptions options, String experiment) { - List experiments = - firstNonNull(options.getExperiments(), Collections.emptyList()); - return experiments.contains(experiment); - } - - /** Helper to configure the Dataflow Job Environment based on the user's job options. */ - private static Map getEnvironmentVersion(DataflowPipelineOptions options) { - DataflowRunnerInfo runnerInfo = DataflowRunnerInfo.getDataflowRunnerInfo(); - String majorVersion; - String jobType; - if (hasExperiment(options, "beam_fn_api")) { - majorVersion = runnerInfo.getFnApiEnvironmentMajorVersion(); - jobType = options.isStreaming() ? "FNAPI_STREAMING" : "FNAPI_BATCH"; - } else { - majorVersion = runnerInfo.getLegacyEnvironmentMajorVersion(); - jobType = options.isStreaming() ? "STREAMING" : "JAVA_BATCH_AUTOSCALING"; - } - return ImmutableMap.of( - PropertyNames.ENVIRONMENT_VERSION_MAJOR_KEY, majorVersion, - PropertyNames.ENVIRONMENT_VERSION_JOB_TYPE_KEY, jobType); - } - @VisibleForTesting void replaceTransforms(Pipeline pipeline) { for (Map.Entry override : overrides.entrySet()) { http://git-wip-us.apache.org/repos/asf/beam/blob/b31be1ca/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerInfo.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerInfo.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerInfo.java index 12b3f38..59cb8a4 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerInfo.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerInfo.java @@ -47,34 +47,32 @@ public final class DataflowRunnerInfo { private Properties properties; - private static final String FNAPI_ENVIRONMENT_MAJOR_VERSION_KEY = - "fnapi.environment.major.version"; - private static final String LEGACY_ENVIRONMENT_MAJOR_VERSION_KEY = - "legacy.environment.major.version"; - private static final String CONTAINER_VERSION_KEY = "container.version"; + private static final String ENVIRONMENT_MAJOR_VERSION_KEY = "environment.major.version"; + private static final String BATCH_WORKER_HARNESS_CONTAINER_IMAGE_KEY = "worker.image.batch"; + private static final String STREAMING_WORKER_HARNESS_CONTAINER_IMAGE_KEY = + "worker.image.streaming"; - /** Provides the legacy environment's major version number. */ - public String getLegacyEnvironmentMajorVersion() { + /** Provides the environment's major version number. */ + public String getEnvironmentMajorVersion() { checkState( - properties.containsKey(LEGACY_ENVIRONMENT_MAJOR_VERSION_KEY), - "Unknown legacy environment major version"); - return properties.getProperty(LEGACY_ENVIRONMENT_MAJOR_VERSION_KEY); + properties.containsKey(ENVIRONMENT_MAJOR_VERSION_KEY), "Unknown environment major version"); + return properties.getProperty(ENVIRONMENT_MAJOR_VERSION_KEY); } - /** Provides the FnAPI environment's major version number. */ - public String getFnApiEnvironmentMajorVersion() { + /** Provides the batch worker harness container image name. */ + public String getBatchWorkerHarnessContainerImage() { checkState( - properties.containsKey(FNAPI_ENVIRONMENT_MAJOR_VERSION_KEY), - "Unknown FnAPI environment major version"); - return properties.getProperty(FNAPI_ENVIRONMENT_MAJOR_VERSION_KEY); + properties.containsKey(BATCH_WORKER_HARNESS_CONTAINER_IMAGE_KEY), + "Unknown batch worker harness container image"); + return properties.getProperty(BATCH_WORKER_HARNESS_CONTAINER_IMAGE_KEY); } - /** Provides the container version that will be used for constructing harness image paths. */ - public String getContainerVersion() { + /** Provides the streaming worker harness container image name. */ + public String getStreamingWorkerHarnessContainerImage() { checkState( - properties.containsKey(CONTAINER_VERSION_KEY), - "Unknown container version"); - return properties.getProperty(CONTAINER_VERSION_KEY); + properties.containsKey(STREAMING_WORKER_HARNESS_CONTAINER_IMAGE_KEY), + "Unknown streaming worker harness container image"); + return properties.getProperty(STREAMING_WORKER_HARNESS_CONTAINER_IMAGE_KEY); } private DataflowRunnerInfo(String resourcePath) { http://git-wip-us.apache.org/repos/asf/beam/blob/b31be1ca/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java index 729bca4..cdfa3f5 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.google.api.services.dataflow.Dataflow; import java.util.List; import java.util.Map; -import javax.annotation.Nullable; import org.apache.beam.runners.dataflow.util.DataflowTransport; import org.apache.beam.runners.dataflow.util.GcsStager; import org.apache.beam.runners.dataflow.util.Stager; @@ -54,7 +53,6 @@ public interface DataflowPipelineDebugOptions extends PipelineOptions { + "be enabled with this flag. Please sync with the Dataflow team before enabling any " + "experiments.") @Experimental - @Nullable List getExperiments(); void setExperiments(List value); http://git-wip-us.apache.org/repos/asf/beam/blob/b31be1ca/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java index e2c4bf4..3c5d05a 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java @@ -20,7 +20,6 @@ package org.apache.beam.runners.dataflow.options; import com.fasterxml.jackson.annotation.JsonIgnore; import java.util.List; import javax.annotation.Nullable; -import org.apache.beam.runners.dataflow.DataflowRunner; import org.apache.beam.runners.dataflow.DataflowRunnerInfo; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.options.Default; @@ -130,14 +129,11 @@ public interface DataflowPipelineWorkerPoolOptions extends PipelineOptions { @Override public String create(PipelineOptions options) { DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); - String containerVersion = DataflowRunnerInfo.getDataflowRunnerInfo().getContainerVersion(); - String containerType; - if (DataflowRunner.hasExperiment(dataflowOptions, "beam_fn_api")) { - containerType = "java"; + if (dataflowOptions.isStreaming()) { + return DataflowRunnerInfo.getDataflowRunnerInfo().getStreamingWorkerHarnessContainerImage(); } else { - containerType = dataflowOptions.isStreaming() ? "beam-java-streaming" : "beam-java-batch"; + return DataflowRunnerInfo.getDataflowRunnerInfo().getBatchWorkerHarnessContainerImage(); } - return String.format("dataflow.gcr.io/v1beta3/%s:%s", containerType, containerVersion); } } http://git-wip-us.apache.org/repos/asf/beam/blob/b31be1ca/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties b/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties index ac68970..47e316c 100644 --- a/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties +++ b/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties @@ -16,6 +16,8 @@ # # Dataflow runtime properties -legacy.environment.major.version=${dataflow.legacy_environment_major_version} -fnapi.environment.major.version=${dataflow.fnapi_environment_major_version} -container.version=${dataflow.container_version} +environment.major.version=${dataflow.environment_major_version} + +worker.image.batch=dataflow.gcr.io/v1beta3/beam-java-batch:${dataflow.container_version} + +worker.image.streaming=dataflow.gcr.io/v1beta3/beam-java-streaming:${dataflow.container_version} http://git-wip-us.apache.org/repos/asf/beam/blob/b31be1ca/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerInfoTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerInfoTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerInfoTest.java index 3502040..9b5b374 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerInfoTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerInfoTest.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.dataflow; import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -33,22 +32,20 @@ public class DataflowRunnerInfoTest { public void getDataflowRunnerInfo() throws Exception { DataflowRunnerInfo info = DataflowRunnerInfo.getDataflowRunnerInfo(); - String version = info.getLegacyEnvironmentMajorVersion(); + String version = info.getEnvironmentMajorVersion(); // Validate major version is a number assertTrue( - String.format("Legacy environment major version number %s is not a number", version), + String.format("Environment major version number %s is not a number", version), version.matches("\\d+")); - version = info.getFnApiEnvironmentMajorVersion(); - // Validate major version is a number - assertTrue( - String.format("FnAPI environment major version number %s is not a number", version), - version.matches("\\d+")); - - // Validate container version does not contain a $ (indicating it was not filled in). + // Validate container images contain gcr.io + assertThat( + "batch worker harness container image invalid", + info.getBatchWorkerHarnessContainerImage(), + containsString("gcr.io")); assertThat( - "container version invalid", - info.getContainerVersion(), - not(containsString("$"))); + "streaming worker harness container image invalid", + info.getStreamingWorkerHarnessContainerImage(), + containsString("gcr.io")); } } http://git-wip-us.apache.org/repos/asf/beam/blob/b31be1ca/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index 246feb0..a788077 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -24,7 +24,6 @@ import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.startsWith; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; @@ -1119,20 +1118,4 @@ public class DataflowRunnerTest { thrown.expect(RuntimeException.class); p.run(); } - - @Test - public void testHasExperiment() { - DataflowPipelineDebugOptions options = - PipelineOptionsFactory.as(DataflowPipelineDebugOptions.class); - - options.setExperiments(null); - assertFalse(DataflowRunner.hasExperiment(options, "foo")); - - options.setExperiments(ImmutableList.of("foo", "bar")); - assertTrue(DataflowRunner.hasExperiment(options, "foo")); - assertTrue(DataflowRunner.hasExperiment(options, "bar")); - assertFalse(DataflowRunner.hasExperiment(options, "baz")); - assertFalse(DataflowRunner.hasExperiment(options, "ba")); - assertFalse(DataflowRunner.hasExperiment(options, "BAR")); - } }