beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tg...@apache.org
Subject [1/2] beam git commit: Add a default bucket to GcpOptions
Date Wed, 18 Jan 2017 20:56:49 GMT
Repository: beam
Updated Branches:
  refs/heads/master faa2277b5 -> d13549334


Add a default bucket to GcpOptions

If gcpStagingLocation is not specified, construct a stable Default
Bucket. This bucket is used to stage GCP artifacts.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f29a4761
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f29a4761
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f29a4761

Branch: refs/heads/master
Commit: f29a4761220d2e9fa5752af79c6bce690903c760
Parents: faa2277
Author: Sam McVeety <sgmc@google.com>
Authored: Wed Jan 18 12:20:10 2017 -0800
Committer: Thomas Groh <tgroh@google.com>
Committed: Wed Jan 18 12:56:04 2017 -0800

----------------------------------------------------------------------
 .../options/DataflowPipelineOptionsTest.java    |   4 +-
 .../org/apache/beam/sdk/options/GcpOptions.java |  36 ++++--
 .../org/apache/beam/sdk/util/DefaultBucket.java | 105 +++++++++++++++++
 .../apache/beam/sdk/util/GcpProjectUtil.java    |   2 +-
 .../java/org/apache/beam/sdk/util/GcsUtil.java  |   4 +-
 .../apache/beam/sdk/options/GcpOptionsTest.java |   4 +-
 .../apache/beam/sdk/util/DefaultBucketTest.java | 112 +++++++++++++++++++
 7 files changed, 250 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f29a4761/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
index 9dacfb2..30eee0e 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
@@ -183,9 +183,9 @@ public class DataflowPipelineOptionsTest {
   @Test
   public void testDefaultStagingLocationUnset() {
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    options.setProject("");
     thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Error constructing default value for stagingLocation: "
-        + "failed to retrieve gcpTempLocation.");
+    thrown.expectMessage("Error constructing default value for stagingLocation");
     options.getStagingLocation();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f29a4761/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
index 042f4b4..c04e4f0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
@@ -17,12 +17,11 @@
  */
 package org.apache.beam.sdk.options;
 
-import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Strings.isNullOrEmpty;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.google.auth.Credentials;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Strings;
 import com.google.common.io.Files;
 import java.io.File;
 import java.io.IOException;
@@ -34,6 +33,7 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.util.CredentialFactory;
+import org.apache.beam.sdk.util.DefaultBucket;
 import org.apache.beam.sdk.util.GcpCredentialFactory;
 import org.apache.beam.sdk.util.InstanceBuilder;
 import org.apache.beam.sdk.util.PathValidator;
@@ -62,6 +62,17 @@ public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions
{
   void setProject(String value);
 
   /**
+   * GCP <a href="https://developers.google.com/compute/docs/zones"
+   * >availability zone</a> for operations.
+   *
+   * <p>Default is set on a per-service basis.
+   */
+  @Description("GCP availability zone for running GCP operations. "
+      + "Default is up to the individual service.")
+  String getZone();
+  void setZone(String value);
+
+  /**
    * The class of the credential factory that should be created and used to create
    * credentials. If gcpCredential has not been set explicitly, an instance of this class
will
    * be constructed and used as a credential factory.
@@ -197,15 +208,18 @@ public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions
{
     @Nullable
     public String create(PipelineOptions options) {
       String tempLocation = options.getTempLocation();
-      checkArgument(!Strings.isNullOrEmpty(options.getTempLocation()),
-          "Error constructing default value for gcpTempLocation: tempLocation is not set");
-      try {
-        PathValidator validator = options.as(GcsOptions.class).getPathValidator();
-        validator.validateOutputFilePrefixSupported(tempLocation);
-      } catch (Exception e) {
-        throw new IllegalArgumentException(String.format(
-            "Error constructing default value for gcpTempLocation: tempLocation is not"
-                + " a valid GCS path, %s. ", tempLocation), e);
+      if (isNullOrEmpty(tempLocation)) {
+        tempLocation = DefaultBucket.tryCreateDefaultBucket(options);
+        options.setTempLocation(tempLocation);
+      } else {
+        try {
+          PathValidator validator = options.as(GcsOptions.class).getPathValidator();
+          validator.validateOutputFilePrefixSupported(tempLocation);
+        } catch (Exception e) {
+          throw new IllegalArgumentException(String.format(
+              "Error constructing default value for gcpTempLocation: tempLocation is not"
+              + " a valid GCS path, %s. ", tempLocation), e);
+        }
       }
       return tempLocation;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/f29a4761/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DefaultBucket.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DefaultBucket.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DefaultBucket.java
new file mode 100644
index 0000000..75954c0
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DefaultBucket.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Strings.isNullOrEmpty;
+
+import com.google.api.services.storage.model.Bucket;
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+import org.apache.beam.sdk.options.CloudResourceManagerOptions;
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class for handling default GCS buckets.
+ */
+public class DefaultBucket {
+  static final Logger LOG = LoggerFactory.getLogger(DefaultBucket.class);
+
+  static final String DEFAULT_REGION = "us-central1";
+
+  /**
+   * Creates a default bucket or verifies the existence and proper access control
+   * of an existing default bucket.  Returns the location if successful.
+   */
+  public static String tryCreateDefaultBucket(PipelineOptions options) {
+    GcsOptions gcpOptions = options.as(GcsOptions.class);
+
+    final String projectId = gcpOptions.getProject();
+    checkArgument(!isNullOrEmpty(projectId),
+                  "--project is a required option.");
+
+    // Look up the project number, to create a default bucket with a stable
+    // name with no special characters.
+    long projectNumber = 0L;
+    try {
+      projectNumber = gcpOptions.as(CloudResourceManagerOptions.class)
+          .getGcpProjectUtil().getProjectNumber(projectId);
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to verify project with ID " + projectId, e);
+    }
+    String region = DEFAULT_REGION;
+    if (!isNullOrEmpty(gcpOptions.getZone())) {
+      region = getRegionFromZone(gcpOptions.getZone());
+    }
+    final String bucketName =
+      "dataflow-staging-" + region + "-" + projectNumber;
+    LOG.info("No staging location provided, attempting to use default bucket: {}",
+             bucketName);
+    Bucket bucket = new Bucket()
+      .setName(bucketName)
+      .setLocation(region);
+    // Always try to create the bucket before checking access, so that we do not
+    // race with other pipelines that may be attempting to do the same thing.
+    try {
+      gcpOptions.getGcsUtil().createBucket(projectId, bucket);
+    } catch (FileAlreadyExistsException e) {
+      LOG.debug("Bucket '{}'' already exists, verifying access.", bucketName);
+    } catch (IOException e) {
+      throw new RuntimeException("Unable create default bucket.", e);
+    }
+
+    // Once the bucket is expected to exist, verify that it is correctly owned
+    // by the project executing the job.
+    try {
+      long owner = gcpOptions.getGcsUtil().bucketOwner(
+        GcsPath.fromComponents(bucketName, ""));
+      checkArgument(
+        owner == projectNumber,
+        "Bucket owner does not match the project from --project:"
+        + " %s vs. %s", owner, projectNumber);
+    } catch (IOException e) {
+      throw new RuntimeException(
+        "Unable to determine the owner of the default bucket at gs://" + bucketName, e);
+    }
+    return "gs://" + bucketName;
+  }
+
+  @VisibleForTesting
+  static String getRegionFromZone(String zone) {
+    String[] zoneParts = zone.split("-");
+    checkArgument(zoneParts.length >= 2, "Invalid zone provided: %s", zone);
+    return zoneParts[0] + "-" + zoneParts[1];
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f29a4761/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java
index beac4e4..f73afe0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java
@@ -76,7 +76,7 @@ public class GcpProjectUtil {
    * Returns the project number or throws an exception if the project does not
    * exist or has other access exceptions.
    */
-  long getProjectNumber(String projectId) throws IOException {
+  public long getProjectNumber(String projectId) throws IOException {
     return getProjectNumber(
       projectId,
       BACKOFF_FACTORY.backoff(),

http://git-wip-us.apache.org/repos/asf/beam/blob/f29a4761/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
index 521673c..a10ea28 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
@@ -451,7 +451,9 @@ public class GcsUtil {
   void createBucket(String projectId, Bucket bucket, BackOff backoff, Sleeper sleeper)
         throws IOException {
     Storage.Buckets.Insert insertBucket =
-      storageClient.buckets().insert(projectId, bucket);
+        storageClient.buckets().insert(projectId, bucket);
+    insertBucket.setPredefinedAcl("projectPrivate");
+    insertBucket.setPredefinedDefaultObjectAcl("projectPrivate");
 
     try {
       ResilientOperation.retry(

http://git-wip-us.apache.org/repos/asf/beam/blob/f29a4761/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java
index 7854d67..288383e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java
@@ -109,9 +109,9 @@ public class GcpOptionsTest {
   @Test
   public void testEmptyGcpTempLocation() throws Exception {
     GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+    options.setProject("");
     thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage(
-        "Error constructing default value for gcpTempLocation: tempLocation is not set");
+    thrown.expectMessage("--project is a required option");
     options.getGcpTempLocation();
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f29a4761/sdks/java/core/src/test/java/org/apache/beam/sdk/util/DefaultBucketTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/DefaultBucketTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/DefaultBucketTest.java
new file mode 100644
index 0000000..395e1f3
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/DefaultBucketTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.when;
+
+import com.google.api.services.storage.model.Bucket;
+import java.io.IOException;
+import org.apache.beam.sdk.options.CloudResourceManagerOptions;
+import org.apache.beam.sdk.options.GcpOptions;
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.MockitoAnnotations;
+import org.mockito.MockitoAnnotations.Mock;
+
+/** Tests for DefaultBucket. */
+@RunWith(JUnit4.class)
+public class DefaultBucketTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  private PipelineOptions options;
+  @Mock
+  private GcsUtil gcsUtil;
+  @Mock
+  private GcpProjectUtil gcpUtil;
+
+  @Before
+  public void setUp() {
+    MockitoAnnotations.initMocks(this);
+    options = PipelineOptionsFactory.create();
+    options.as(GcsOptions.class).setGcsUtil(gcsUtil);
+    options.as(CloudResourceManagerOptions.class).setGcpProjectUtil(gcpUtil);
+    options.as(GcpOptions.class).setProject("foo");
+    options.as(GcpOptions.class).setZone("us-north1-a");
+  }
+
+  @Test
+  public void testCreateBucket() {
+    String bucket = DefaultBucket.tryCreateDefaultBucket(options);
+    assertEquals("gs://dataflow-staging-us-north1-0", bucket);
+  }
+
+  @Test
+  public void testCreateBucketProjectLookupFails() throws IOException {
+    when(gcpUtil.getProjectNumber("foo")).thenThrow(new IOException("badness"));
+
+    thrown.expect(RuntimeException.class);
+    thrown.expectMessage("Unable to verify project");
+    DefaultBucket.tryCreateDefaultBucket(options);
+  }
+
+  @Test
+  public void testCreateBucketCreateBucketFails() throws IOException {
+    doThrow(new IOException("badness")).when(
+      gcsUtil).createBucket(any(String.class), any(Bucket.class));
+
+    thrown.expect(RuntimeException.class);
+    thrown.expectMessage("Unable create default bucket");
+    DefaultBucket.tryCreateDefaultBucket(options);
+  }
+
+  @Test
+  public void testCannotGetBucketOwner() throws IOException {
+    when(gcsUtil.bucketOwner(any(GcsPath.class)))
+      .thenThrow(new IOException("badness"));
+
+    thrown.expect(RuntimeException.class);
+    thrown.expectMessage("Unable to determine the owner");
+    DefaultBucket.tryCreateDefaultBucket(options);
+  }
+
+  @Test
+  public void testProjectMismatch() throws IOException {
+    when(gcsUtil.bucketOwner(any(GcsPath.class))).thenReturn(5L);
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Bucket owner does not match the project");
+    DefaultBucket.tryCreateDefaultBucket(options);
+  }
+
+  @Test
+  public void regionFromZone() throws IOException {
+    assertEquals("us-central1", DefaultBucket.getRegionFromZone("us-central1-a"));
+    assertEquals("asia-east", DefaultBucket.getRegionFromZone("asia-east-a"));
+  }
+}


Mime
View raw message