Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 82920200C6F for ; Tue, 9 May 2017 20:03:54 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 8117A160BB6; Tue, 9 May 2017 18:03:54 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id CFCA9160B9A for ; Tue, 9 May 2017 20:03:52 +0200 (CEST) Received: (qmail 33664 invoked by uid 500); 9 May 2017 18:03:52 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 33650 invoked by uid 99); 9 May 2017 18:03:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 May 2017 18:03:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C727FDFAF3; Tue, 9 May 2017 18:03:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dhalperi@apache.org To: commits@beam.apache.org Date: Tue, 09 May 2017 18:03:51 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] beam git commit: [BEAM-2211] Move PathValidator into GCP-Core archived-at: Tue, 09 May 2017 18:03:54 -0000 Repository: beam Updated Branches: refs/heads/master 5e4fd1b95 -> 28180c45b [BEAM-2211] Move PathValidator into GCP-Core For now, this does not need to be a Beam concept Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7d165a73 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7d165a73 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7d165a73 Branch: refs/heads/master Commit: 7d165a73070468ff3a8907bcd5c3a3c4972d79e5 Parents: 5e4fd1b Author: Dan Halperin Authored: Mon May 8 16:01:40 2017 -0700 Committer: Dan Halperin Committed: Tue May 9 11:03:38 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/dataflow/DataflowRunner.java | 2 +- .../DataflowPipelineTranslatorTest.java | 2 +- .../beam/sdk/io/fs/NoopPathValidator.java | 52 --------- .../apache/beam/sdk/io/fs/PathValidator.java | 58 ---------- .../apache/beam/sdk/util/NoopPathValidator.java | 51 --------- .../sdk/extensions/gcp/options/GcpOptions.java | 2 +- .../sdk/extensions/gcp/options/GcsOptions.java | 4 +- .../gcp/storage/GcsPathValidator.java | 107 +++++++++++++++++++ .../gcp/storage/NoopPathValidator.java | 53 +++++++++ .../extensions/gcp/storage/PathValidator.java | 59 ++++++++++ .../apache/beam/sdk/util/GcsPathValidator.java | 107 ------------------- .../apache/beam/sdk/util/NoopPathValidator.java | 51 +++++++++ .../gcp/storage/GcsPathValidatorTest.java | 107 +++++++++++++++++++ .../beam/sdk/util/GcsPathValidatorTest.java | 106 ------------------ 14 files changed, 382 insertions(+), 379 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/7d165a73/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 250c064..2ef8737 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -83,11 +83,11 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.NonDeterministicException; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.extensions.gcp.storage.PathValidator; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.UnboundedSource; -import org.apache.beam.sdk.io.fs.PathValidator; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder; http://git-wip-us.apache.org/repos/asf/beam/blob/7d165a73/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 9040f8f..93c1e5b 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -77,6 +77,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; +import org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptions; @@ -93,7 +94,6 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.util.GcsPathValidator; import org.apache.beam.sdk.util.GcsUtil; import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.apache.beam.sdk.values.KV; http://git-wip-us.apache.org/repos/asf/beam/blob/7d165a73/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/NoopPathValidator.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/NoopPathValidator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/NoopPathValidator.java deleted file mode 100644 index d5be8f0..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/NoopPathValidator.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.fs; - -import org.apache.beam.sdk.annotations.Internal; -import org.apache.beam.sdk.options.PipelineOptions; - -/** - * For internal use only; no backwards compatibility guarantees. - * - *

Noop implementation of {@link PathValidator}. All paths are allowed and returned unchanged. - */ -@Internal -public class NoopPathValidator implements PathValidator { - - private NoopPathValidator() { - } - - public static PathValidator fromOptions( - @SuppressWarnings("unused") PipelineOptions options) { - return new NoopPathValidator(); - } - - @Override - public void validateInputFilePatternSupported(String filepattern) {} - - @Override - public void validateOutputFilePrefixSupported(String filePrefix) {} - - @Override - public void validateOutputResourceSupported(ResourceId resourceId) {} - - @Override - public String verifyPath(String path) { - return path; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/7d165a73/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/PathValidator.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/PathValidator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/PathValidator.java deleted file mode 100644 index b88a33e..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/PathValidator.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.fs; - -import org.apache.beam.sdk.annotations.Internal; - -/** - * For internal use only; no backwards compatibility guarantees. - * - *

Interface for controlling validation of paths. - */ -@Internal -public interface PathValidator { - /** - * Validate that a file pattern is conforming. - * - * @param filepattern The file pattern to verify. - */ - void validateInputFilePatternSupported(String filepattern); - - /** - * Validate that an output file prefix is conforming. - * - * @param filePrefix the file prefix to verify. - */ - void validateOutputFilePrefixSupported(String filePrefix); - - /** - * Validates that an output path is conforming. - * - * @param resourceId the file prefix to verify. - */ - void validateOutputResourceSupported(ResourceId resourceId); - - /** - * Validate that a path is a valid path and that the path - * is accessible. - * - * @param path The path to verify. - * @return The post-validation path. - */ - String verifyPath(String path); -} http://git-wip-us.apache.org/repos/asf/beam/blob/7d165a73/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopPathValidator.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopPathValidator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopPathValidator.java deleted file mode 100644 index 0015e59..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopPathValidator.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import org.apache.beam.sdk.io.fs.PathValidator; -import org.apache.beam.sdk.io.fs.ResourceId; -import org.apache.beam.sdk.options.PipelineOptions; - -/** - * @deprecated use {@link org.apache.beam.sdk.io.fs.NoopPathValidator}. - */ -@Deprecated -public class NoopPathValidator implements PathValidator { - - private NoopPathValidator() { - } - - public static PathValidator fromOptions( - @SuppressWarnings("unused") PipelineOptions options) { - return new NoopPathValidator(); - } - - @Override - public void validateInputFilePatternSupported(String filepattern) {} - - @Override - public void validateOutputFilePrefixSupported(String filePrefix) {} - - @Override - public void validateOutputResourceSupported(ResourceId resourceId) {} - - @Override - public String verifyPath(String path) { - return path; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/7d165a73/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java index 985520f..1e25560 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java @@ -48,7 +48,7 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.extensions.gcp.auth.CredentialFactory; import org.apache.beam.sdk.extensions.gcp.auth.GcpCredentialFactory; import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer; -import org.apache.beam.sdk.io.fs.PathValidator; +import org.apache.beam.sdk.extensions.gcp.storage.PathValidator; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.Description; http://git-wip-us.apache.org/repos/asf/beam/blob/7d165a73/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java index 78e233e..7ac9b69 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java @@ -26,14 +26,14 @@ import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; -import org.apache.beam.sdk.io.fs.PathValidator; +import org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator; +import org.apache.beam.sdk.extensions.gcp.storage.PathValidator; import org.apache.beam.sdk.options.ApplicationNameOptions; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.Hidden; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.util.GcsPathValidator; import org.apache.beam.sdk.util.GcsUtil; import org.apache.beam.sdk.util.InstanceBuilder; http://git-wip-us.apache.org/repos/asf/beam/blob/7d165a73/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidator.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidator.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidator.java new file mode 100644 index 0000000..e7257b2 --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidator.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.gcp.storage; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.io.IOException; +import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.GcsUtil; +import org.apache.beam.sdk.util.gcsfs.GcsPath; + +/** + * GCP implementation of {@link PathValidator}. Only GCS paths are allowed. + */ +public class GcsPathValidator implements PathValidator { + + private GcsOptions gcpOptions; + + private GcsPathValidator(GcsOptions options) { + this.gcpOptions = options; + } + + public static GcsPathValidator fromOptions(PipelineOptions options) { + return new GcsPathValidator(options.as(GcsOptions.class)); + } + + /** + * Validates the the input GCS path is accessible and that the path + * is well formed. + */ + @Override + public void validateInputFilePatternSupported(String filepattern) { + GcsPath gcsPath = getGcsPath(filepattern); + checkArgument(GcsUtil.isGcsPatternSupported(gcsPath.getObject())); + verifyPath(filepattern); + verifyPathIsAccessible(filepattern, "Could not find file %s"); + } + + /** + * Validates the the output GCS path is accessible and that the path + * is well formed. + */ + @Override + public void validateOutputFilePrefixSupported(String filePrefix) { + verifyPath(filePrefix); + verifyPathIsAccessible(filePrefix, "Output path does not exist or is not writeable: %s"); + } + + @Override + public void validateOutputResourceSupported(ResourceId resourceId) { + checkArgument( + resourceId.getScheme().equals("gs"), + "Expected a valid 'gs://' path but was given: '%s'", + resourceId); + verifyPath(resourceId.toString()); + } + + @Override + public String verifyPath(String path) { + GcsPath gcsPath = getGcsPath(path); + checkArgument(gcsPath.isAbsolute(), "Must provide absolute paths for Dataflow"); + checkArgument(!gcsPath.getObject().isEmpty(), + "Missing object or bucket in path: '%s', did you mean: 'gs://some-bucket/%s'?", + gcsPath, gcsPath.getBucket()); + checkArgument(!gcsPath.getObject().contains("//"), + "Dataflow Service does not allow objects with consecutive slashes"); + return gcsPath.toResourceName(); + } + + private void verifyPathIsAccessible(String path, String errorMessage) { + GcsPath gcsPath = getGcsPath(path); + try { + checkArgument(gcpOptions.getGcsUtil().bucketAccessible(gcsPath), + errorMessage, path); + } catch (IOException e) { + throw new RuntimeException( + String.format("Unable to verify that GCS bucket gs://%s exists.", gcsPath.getBucket()), + e); + } + } + + private GcsPath getGcsPath(String path) { + try { + return GcsPath.fromUri(path); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException(String.format( + "Expected a valid 'gs://' path but was given '%s'", path), e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/7d165a73/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/NoopPathValidator.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/NoopPathValidator.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/NoopPathValidator.java new file mode 100644 index 0000000..79b8732 --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/NoopPathValidator.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.gcp.storage; + +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.options.PipelineOptions; + +/** + * For internal use only; no backwards compatibility guarantees. + * + *

Noop implementation of {@link PathValidator}. All paths are allowed and returned unchanged. + */ +@Internal +public class NoopPathValidator implements PathValidator { + + private NoopPathValidator() { + } + + public static PathValidator fromOptions( + @SuppressWarnings("unused") PipelineOptions options) { + return new NoopPathValidator(); + } + + @Override + public void validateInputFilePatternSupported(String filepattern) {} + + @Override + public void validateOutputFilePrefixSupported(String filePrefix) {} + + @Override + public void validateOutputResourceSupported(ResourceId resourceId) {} + + @Override + public String verifyPath(String path) { + return path; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/7d165a73/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/PathValidator.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/PathValidator.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/PathValidator.java new file mode 100644 index 0000000..cc769c8 --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/PathValidator.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.gcp.storage; + +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.io.fs.ResourceId; + +/** + * For internal use only; no backwards compatibility guarantees. + * + *

Interface for controlling validation of paths. + */ +@Internal +public interface PathValidator { + /** + * Validate that a file pattern is conforming. + * + * @param filepattern The file pattern to verify. + */ + void validateInputFilePatternSupported(String filepattern); + + /** + * Validate that an output file prefix is conforming. + * + * @param filePrefix the file prefix to verify. + */ + void validateOutputFilePrefixSupported(String filePrefix); + + /** + * Validates that an output path is conforming. + * + * @param resourceId the file prefix to verify. + */ + void validateOutputResourceSupported(ResourceId resourceId); + + /** + * Validate that a path is a valid path and that the path + * is accessible. + * + * @param path The path to verify. + * @return The post-validation path. + */ + String verifyPath(String path); +} http://git-wip-us.apache.org/repos/asf/beam/blob/7d165a73/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java deleted file mode 100644 index c4e557b..0000000 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static com.google.common.base.Preconditions.checkArgument; - -import java.io.IOException; -import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; -import org.apache.beam.sdk.io.fs.PathValidator; -import org.apache.beam.sdk.io.fs.ResourceId; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.util.gcsfs.GcsPath; - -/** - * GCP implementation of {@link PathValidator}. Only GCS paths are allowed. - */ -public class GcsPathValidator implements PathValidator { - - private GcsOptions gcpOptions; - - private GcsPathValidator(GcsOptions options) { - this.gcpOptions = options; - } - - public static GcsPathValidator fromOptions(PipelineOptions options) { - return new GcsPathValidator(options.as(GcsOptions.class)); - } - - /** - * Validates the the input GCS path is accessible and that the path - * is well formed. - */ - @Override - public void validateInputFilePatternSupported(String filepattern) { - GcsPath gcsPath = getGcsPath(filepattern); - checkArgument(GcsUtil.isGcsPatternSupported(gcsPath.getObject())); - verifyPath(filepattern); - verifyPathIsAccessible(filepattern, "Could not find file %s"); - } - - /** - * Validates the the output GCS path is accessible and that the path - * is well formed. - */ - @Override - public void validateOutputFilePrefixSupported(String filePrefix) { - verifyPath(filePrefix); - verifyPathIsAccessible(filePrefix, "Output path does not exist or is not writeable: %s"); - } - - @Override - public void validateOutputResourceSupported(ResourceId resourceId) { - checkArgument( - resourceId.getScheme().equals("gs"), - "Expected a valid 'gs://' path but was given: '%s'", - resourceId); - verifyPath(resourceId.toString()); - } - - @Override - public String verifyPath(String path) { - GcsPath gcsPath = getGcsPath(path); - checkArgument(gcsPath.isAbsolute(), "Must provide absolute paths for Dataflow"); - checkArgument(!gcsPath.getObject().isEmpty(), - "Missing object or bucket in path: '%s', did you mean: 'gs://some-bucket/%s'?", - gcsPath, gcsPath.getBucket()); - checkArgument(!gcsPath.getObject().contains("//"), - "Dataflow Service does not allow objects with consecutive slashes"); - return gcsPath.toResourceName(); - } - - private void verifyPathIsAccessible(String path, String errorMessage) { - GcsPath gcsPath = getGcsPath(path); - try { - checkArgument(gcpOptions.getGcsUtil().bucketAccessible(gcsPath), - errorMessage, path); - } catch (IOException e) { - throw new RuntimeException( - String.format("Unable to verify that GCS bucket gs://%s exists.", gcsPath.getBucket()), - e); - } - } - - private GcsPath getGcsPath(String path) { - try { - return GcsPath.fromUri(path); - } catch (IllegalArgumentException e) { - throw new IllegalArgumentException(String.format( - "Expected a valid 'gs://' path but was given '%s'", path), e); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/7d165a73/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/NoopPathValidator.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/NoopPathValidator.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/NoopPathValidator.java new file mode 100644 index 0000000..85b68b2 --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/NoopPathValidator.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import org.apache.beam.sdk.extensions.gcp.storage.PathValidator; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.options.PipelineOptions; + +/** + * @deprecated use {@link org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator}. + */ +@Deprecated +public class NoopPathValidator implements PathValidator { + + private NoopPathValidator() { + } + + public static PathValidator fromOptions( + @SuppressWarnings("unused") PipelineOptions options) { + return new NoopPathValidator(); + } + + @Override + public void validateInputFilePatternSupported(String filepattern) {} + + @Override + public void validateOutputFilePrefixSupported(String filePrefix) {} + + @Override + public void validateOutputResourceSupported(ResourceId resourceId) {} + + @Override + public String verifyPath(String path) { + return path; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/7d165a73/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidatorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidatorTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidatorTest.java new file mode 100644 index 0000000..91ac46c --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidatorTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.gcp.storage; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.when; + +import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; +import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.util.GcsUtil; +import org.apache.beam.sdk.util.gcsfs.GcsPath; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** Tests for {@link GcsPathValidator}. */ +@RunWith(JUnit4.class) +public class GcsPathValidatorTest { + @Rule public ExpectedException expectedException = ExpectedException.none(); + + @Mock private GcsUtil mockGcsUtil; + private GcsPathValidator validator; + + @Before + public void setUp() throws Exception { + MockitoAnnotations.initMocks(this); + when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(true); + GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class); + options.setGcpCredential(new TestCredential()); + options.setGcsUtil(mockGcsUtil); + validator = GcsPathValidator.fromOptions(options); + } + + @Test + public void testValidFilePattern() { + validator.validateInputFilePatternSupported("gs://bucket/path"); + } + + @Test + public void testInvalidFilePattern() { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage( + "Expected a valid 'gs://' path but was given '/local/path'"); + validator.validateInputFilePatternSupported("/local/path"); + } + + @Test + public void testFilePatternMissingBucket() { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage( + "Missing object or bucket in path: 'gs://input/', " + + "did you mean: 'gs://some-bucket/input'?"); + validator.validateInputFilePatternSupported("gs://input"); + } + + @Test + public void testWhenBucketDoesNotExist() throws Exception { + when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(false); + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage( + "Could not find file gs://non-existent-bucket/location"); + validator.validateInputFilePatternSupported("gs://non-existent-bucket/location"); + } + + @Test + public void testValidOutputPrefix() { + validator.validateOutputFilePrefixSupported("gs://bucket/path"); + } + + @Test + public void testInvalidOutputPrefix() { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage( + "Expected a valid 'gs://' path but was given '/local/path'"); + validator.validateOutputFilePrefixSupported("/local/path"); + } + + @Test + public void testOutputPrefixMissingBucket() { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage( + "Missing object or bucket in path: 'gs://output/', " + + "did you mean: 'gs://some-bucket/output'?"); + validator.validateOutputFilePrefixSupported("gs://output"); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/7d165a73/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java deleted file mode 100644 index 65fb228..0000000 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.when; - -import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; -import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.util.gcsfs.GcsPath; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; - -/** Tests for {@link GcsPathValidator}. */ -@RunWith(JUnit4.class) -public class GcsPathValidatorTest { - @Rule public ExpectedException expectedException = ExpectedException.none(); - - @Mock private GcsUtil mockGcsUtil; - private GcsPathValidator validator; - - @Before - public void setUp() throws Exception { - MockitoAnnotations.initMocks(this); - when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(true); - GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class); - options.setGcpCredential(new TestCredential()); - options.setGcsUtil(mockGcsUtil); - validator = GcsPathValidator.fromOptions(options); - } - - @Test - public void testValidFilePattern() { - validator.validateInputFilePatternSupported("gs://bucket/path"); - } - - @Test - public void testInvalidFilePattern() { - expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage( - "Expected a valid 'gs://' path but was given '/local/path'"); - validator.validateInputFilePatternSupported("/local/path"); - } - - @Test - public void testFilePatternMissingBucket() { - expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage( - "Missing object or bucket in path: 'gs://input/', " - + "did you mean: 'gs://some-bucket/input'?"); - validator.validateInputFilePatternSupported("gs://input"); - } - - @Test - public void testWhenBucketDoesNotExist() throws Exception { - when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(false); - expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage( - "Could not find file gs://non-existent-bucket/location"); - validator.validateInputFilePatternSupported("gs://non-existent-bucket/location"); - } - - @Test - public void testValidOutputPrefix() { - validator.validateOutputFilePrefixSupported("gs://bucket/path"); - } - - @Test - public void testInvalidOutputPrefix() { - expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage( - "Expected a valid 'gs://' path but was given '/local/path'"); - validator.validateOutputFilePrefixSupported("/local/path"); - } - - @Test - public void testOutputPrefixMissingBucket() { - expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage( - "Missing object or bucket in path: 'gs://output/', " - + "did you mean: 'gs://some-bucket/output'?"); - validator.validateOutputFilePrefixSupported("gs://output"); - } -}