beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/2] incubator-beam git commit: [BEAM-1120] Move some DataflowRunner configurations from code to properties
Date Mon, 12 Dec 2016 17:57:11 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 0afadf64f -> 2f2617c36


[BEAM-1120] Move some DataflowRunner configurations from code to properties


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cfcfa2f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cfcfa2f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cfcfa2f3

Branch: refs/heads/master
Commit: cfcfa2f3e739a3a71b1ec9edf31f8023e1a5ed3f
Parents: 0afadf6
Author: Dan Halperin <dhalperi@google.com>
Authored: Fri Dec 9 18:35:52 2016 +0800
Committer: Dan Halperin <dhalperi@google.com>
Committed: Mon Dec 12 09:56:52 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   | 14 +--
 .../runners/dataflow/DataflowRunnerInfo.java    | 92 ++++++++++++++++++++
 .../DataflowPipelineWorkerPoolOptions.java      |  6 +-
 .../beam/runners/dataflow/dataflow.properties   | 23 +++++
 .../dataflow/DataflowRunnerInfoTest.java        | 51 +++++++++++
 .../org/apache/beam/sdk/util/ReleaseInfo.java   |  4 -
 6 files changed, 172 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cfcfa2f3/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index d902ccb..711b1b0 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -205,16 +205,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob>
{
   /** A set of user defined functions to invoke at different points in execution. */
   private DataflowRunnerHooks hooks;
 
-  // Environment version information.
-  private static final String ENVIRONMENT_MAJOR_VERSION = "6";
-
-  // Default Docker container images that execute Dataflow worker harness, residing in Google
-  // Container Registry, separately for Batch and Streaming.
-  public static final String BATCH_WORKER_HARNESS_CONTAINER_IMAGE =
-      "dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20161205";
-  public static final String STREAMING_WORKER_HARNESS_CONTAINER_IMAGE =
-      "dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20161205";
-
   // The limit of CreateJob request size.
   private static final int CREATE_JOB_REQUEST_LIMIT_BYTES = 10 * 1024 * 1024;
 
@@ -546,7 +536,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob>
{
 
     // Requirements about the service.
     Map<String, Object> environmentVersion = new HashMap<>();
-    environmentVersion.put(PropertyNames.ENVIRONMENT_VERSION_MAJOR_KEY, ENVIRONMENT_MAJOR_VERSION);
+    environmentVersion.put(
+        PropertyNames.ENVIRONMENT_VERSION_MAJOR_KEY,
+        DataflowRunnerInfo.getDataflowRunnerInfo().getEnvironmentMajorVersion());
     newJob.getEnvironment().setVersion(environmentVersion);
     // Default jobType is JAVA_BATCH_AUTOSCALING: A Java job with workers that the job can
     // autoscale if specified.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cfcfa2f3/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerInfo.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerInfo.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerInfo.java
new file mode 100644
index 0000000..59cb8a4
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerInfo.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Populates versioning and other information for {@link DataflowRunner}.
+ */
+public final class DataflowRunnerInfo {
+  private static final Logger LOG = LoggerFactory.getLogger(DataflowRunnerInfo.class);
+
+  private static final String PROPERTIES_PATH =
+      "/org/apache/beam/runners/dataflow/dataflow.properties";
+
+  private static class LazyInit {
+    private static final DataflowRunnerInfo INSTANCE = new DataflowRunnerInfo(PROPERTIES_PATH);
+  }
+
+  /**
+   * Returns an instance of {@link DataflowRunnerInfo}.
+   */
+  public static DataflowRunnerInfo getDataflowRunnerInfo() {
+    return LazyInit.INSTANCE;
+  }
+
+  private Properties properties;
+
+  private static final String ENVIRONMENT_MAJOR_VERSION_KEY = "environment.major.version";
+  private static final String BATCH_WORKER_HARNESS_CONTAINER_IMAGE_KEY = "worker.image.batch";
+  private static final String STREAMING_WORKER_HARNESS_CONTAINER_IMAGE_KEY =
+      "worker.image.streaming";
+
+  /** Provides the environment's major version number. */
+  public String getEnvironmentMajorVersion() {
+    checkState(
+        properties.containsKey(ENVIRONMENT_MAJOR_VERSION_KEY), "Unknown environment major
version");
+    return properties.getProperty(ENVIRONMENT_MAJOR_VERSION_KEY);
+  }
+
+  /** Provides the batch worker harness container image name. */
+  public String getBatchWorkerHarnessContainerImage() {
+    checkState(
+        properties.containsKey(BATCH_WORKER_HARNESS_CONTAINER_IMAGE_KEY),
+        "Unknown batch worker harness container image");
+    return properties.getProperty(BATCH_WORKER_HARNESS_CONTAINER_IMAGE_KEY);
+  }
+
+  /** Provides the streaming worker harness container image name. */
+  public String getStreamingWorkerHarnessContainerImage() {
+    checkState(
+        properties.containsKey(STREAMING_WORKER_HARNESS_CONTAINER_IMAGE_KEY),
+        "Unknown streaming worker harness container image");
+    return properties.getProperty(STREAMING_WORKER_HARNESS_CONTAINER_IMAGE_KEY);
+  }
+
+  private DataflowRunnerInfo(String resourcePath) {
+    properties = new Properties();
+
+    try (InputStream in = DataflowRunnerInfo.class.getResourceAsStream(PROPERTIES_PATH))
{
+      if (in == null) {
+        LOG.warn("Dataflow runner properties resource not found: {}", resourcePath);
+        return;
+      }
+
+      properties.load(in);
+    } catch (IOException e) {
+      LOG.warn("Error loading Dataflow runner properties resource: ", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cfcfa2f3/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java
index 157321a..05086b0 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java
@@ -20,7 +20,7 @@ package org.apache.beam.runners.dataflow.options;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import java.util.List;
 import javax.annotation.Nullable;
-import org.apache.beam.runners.dataflow.DataflowRunner;
+import org.apache.beam.runners.dataflow.DataflowRunnerInfo;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.DefaultValueFactory;
@@ -130,9 +130,9 @@ public interface DataflowPipelineWorkerPoolOptions extends PipelineOptions
{
     public String create(PipelineOptions options) {
       DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
       if (dataflowOptions.isStreaming()) {
-        return DataflowRunner.STREAMING_WORKER_HARNESS_CONTAINER_IMAGE;
+        return DataflowRunnerInfo.getDataflowRunnerInfo().getStreamingWorkerHarnessContainerImage();
       } else {
-        return DataflowRunner.BATCH_WORKER_HARNESS_CONTAINER_IMAGE;
+        return DataflowRunnerInfo.getDataflowRunnerInfo().getBatchWorkerHarnessContainerImage();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cfcfa2f3/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
b/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
new file mode 100644
index 0000000..1eae8cb
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
@@ -0,0 +1,23 @@
+#
+#    Licensed to the Apache Software Foundation (ASF) under one or more
+#    contributor license agreements.  See the NOTICE file distributed with
+#    this work for additional information regarding copyright ownership.
+#    The ASF licenses this file to You under the Apache License, Version 2.0
+#    (the "License"); you may not use this file except in compliance with
+#    the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+#
+# Dataflow runtime properties
+
+environment.major.version=6
+
+worker.image.batch=dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20161205
+
+worker.image.streaming=dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20161205

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cfcfa2f3/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerInfoTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerInfoTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerInfoTest.java
new file mode 100644
index 0000000..9b5b374
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerInfoTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+
+/**
+ * Tests for {@link DataflowRunnerInfo}.
+ */
+public class DataflowRunnerInfoTest {
+
+  @Test
+  public void getDataflowRunnerInfo() throws Exception {
+    DataflowRunnerInfo info = DataflowRunnerInfo.getDataflowRunnerInfo();
+
+    String version = info.getEnvironmentMajorVersion();
+    // Validate major version is a number
+    assertTrue(
+        String.format("Environment major version number %s is not a number", version),
+        version.matches("\\d+"));
+
+    // Validate container images contain gcr.io
+    assertThat(
+        "batch worker harness container image invalid",
+        info.getBatchWorkerHarnessContainerImage(),
+        containsString("gcr.io"));
+    assertThat(
+        "streaming worker harness container image invalid",
+        info.getStreamingWorkerHarnessContainerImage(),
+        containsString("gcr.io"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cfcfa2f3/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReleaseInfo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReleaseInfo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReleaseInfo.java
index ba80de9..eeac557 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReleaseInfo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReleaseInfo.java
@@ -74,10 +74,6 @@ public final class ReleaseInfo extends GenericJson {
     }
 
     for (String name : properties.stringPropertyNames()) {
-      if (name.equals("name")) {
-        // We don't allow the properties to override the SDK name.
-        continue;
-      }
       put(name, properties.getProperty(name));
     }
   }


Mime
View raw message