Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 04005200C5B for ; Thu, 27 Apr 2017 23:56:34 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 02915160B9E; Thu, 27 Apr 2017 21:56:34 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id EF299160BA7 for ; Thu, 27 Apr 2017 23:56:32 +0200 (CEST) Received: (qmail 28751 invoked by uid 500); 27 Apr 2017 21:56:32 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 28737 invoked by uid 99); 27 Apr 2017 21:56:32 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 27 Apr 2017 21:56:32 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D60D1E04B1; Thu, 27 Apr 2017 21:56:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dhalperi@apache.org To: commits@beam.apache.org Date: Thu, 27 Apr 2017 21:56:31 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] beam git commit: [BEAM-59] Add FileSystems#matchNewResource archived-at: Thu, 27 Apr 2017 21:56:34 -0000 Repository: beam Updated Branches: refs/heads/master fdf2de999 -> 14a90d7c5 [BEAM-59] Add FileSystems#matchNewResource The new FileSystems API needs a way to generate a ResourceId for a resource that does not exist. This does not come up in sources, because we typically are just matching existing files. However, sinks need the ability to reference a new directory (say, in order to create it). Couldn't think of anything better than a simple function that says "treat this string as a full resource path with the specified type", which is what FileSystems#matchNewResource is. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f93a2775 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f93a2775 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f93a2775 Branch: refs/heads/master Commit: f93a27755921e73141fd97de042e6dcd04e10a47 Parents: fdf2de9 Author: Dan Halperin Authored: Tue Apr 25 17:29:02 2017 -0700 Committer: Dan Halperin Committed: Thu Apr 27 14:51:04 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/FileSystem.java | 12 +++++++ .../org/apache/beam/sdk/io/FileSystems.java | 15 +++++++++ .../org/apache/beam/sdk/io/LocalFileSystem.java | 7 ++++ .../org/apache/beam/sdk/io/fs/ResourceId.java | 17 +++++++++- .../apache/beam/sdk/io/LocalFileSystemTest.java | 22 ++++++++++++ .../beam/sdk/io/gcp/storage/GcsFileSystem.java | 35 +++++++++++++++----- .../beam/sdk/io/hdfs/HadoopFileSystem.java | 5 +++ 7 files changed, 104 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/f93a2775/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java index 001f596..76c5dc1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java @@ -139,4 +139,16 @@ public abstract class FileSystem { * to determine the state of the resources. */ protected abstract void delete(Collection resourceIds) throws IOException; + + /** + * Returns a new {@link ResourceId} for this filesystem that represents the named resource. + * The user supplies both the resource spec and whether it is a directory. + * + *

The supplied {@code singleResourceSpec} is expected to be in a proper format, including + * any necessary escaping, for this {@link FileSystem}. + * + *

This function may throw an {@link IllegalArgumentException} if given an invalid argument, + * such as when the specified {@code singleResourceSpec} is not a valid resource name. + */ + protected abstract ResourceIdT matchNewResource(String singleResourceSpec, boolean isDirectory); } http://git-wip-us.apache.org/repos/asf/beam/blob/f93a2775/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java index 0b50070..b290498 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java @@ -485,4 +485,19 @@ public class FileSystems { } } } + + /** + * Returns a new {@link ResourceId} that represents the named resource of a type corresponding + * to the resource type. + * + *

The supplied {@code singleResourceSpec} is expected to be in a proper format, including + * any necessary escaping, for the underlying {@link FileSystem}. + * + *

This function may throw an {@link IllegalArgumentException} if given an invalid argument, + * such as when the specified {@code singleResourceSpec} is not a valid resource name. + */ + public static ResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) { + return getFileSystemInternal(parseScheme(singleResourceSpec)) + .matchNewResource(singleResourceSpec, isDirectory); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/f93a2775/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java index 8349a35..2d80ae4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java @@ -34,6 +34,7 @@ import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.PathMatcher; import java.nio.file.Paths; import java.nio.file.StandardCopyOption; @@ -164,6 +165,12 @@ class LocalFileSystem extends FileSystem { } } + @Override + protected LocalResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) { + Path path = Paths.get(singleResourceSpec); + return LocalResourceId.fromPath(path, isDirectory); + } + private MatchResult matchOne(String spec) throws IOException { File file = Paths.get(spec).toFile(); http://git-wip-us.apache.org/repos/asf/beam/blob/f93a2775/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java index b7859ca..26a21bb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.fs; import java.io.Serializable; import org.apache.beam.sdk.io.FileSystem; +import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; /** @@ -27,7 +28,21 @@ import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; *

{@link ResourceId} is hierarchical and composed of a sequence of directory * and file name elements separated by a special separator or delimiter. * - *

TODO: add examples for how ResourceId is constructed and used. + *

{@link ResourceId ResourceIds} are created using {@link FileSystems}. The two primary + * mechanisms are: + * + *

    + *
  • {@link FileSystems#match(java.util.List)}, which takes a list of {@link String} resource + * names or globs, queries the {@link FileSystem} for resources matching these specifications, + * and returns a {@link MatchResult} for each glob. This is typically used when reading from + * files. + * + *
  • {@link FileSystems#matchNewResource(String, boolean)}, which takes a {@link String} full + * resource name and type (file or directory) and generates a {@link FileSystem}-specific + * {@code ResourceId} for that resource. This call does not verify the presence or absence of that + * resource in the file system. This call is typically used when creating new directories or files + * to generate {@link ResourceId ResourceIds} for resources that may not yet exist. + *
*/ public interface ResourceId extends Serializable { http://git-wip-us.apache.org/repos/asf/beam/blob/f93a2775/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java index d335974..ac4fe61 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java @@ -18,8 +18,10 @@ package org.apache.beam.sdk.io; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -301,6 +303,26 @@ public class LocalFileSystemTest { toFilenames(localFileSystem.match(ImmutableList.of(pattern.toString()))).isEmpty()); } + @Test + public void testMatchNewResource() throws Exception { + LocalResourceId fileResource = + localFileSystem + .matchNewResource("/some/test/resource/path", false /* isDirectory */); + LocalResourceId dirResource = + localFileSystem + .matchNewResource("/some/test/resource/path", true /* isDirectory */); + assertNotEquals(fileResource, dirResource); + assertThat( + fileResource.getCurrentDirectory().resolve( + "path", StandardResolveOptions.RESOLVE_DIRECTORY), + equalTo(dirResource.getCurrentDirectory())); + assertThat( + fileResource.getCurrentDirectory().resolve( + "path", StandardResolveOptions.RESOLVE_DIRECTORY), + equalTo(dirResource.getCurrentDirectory())); + assertThat(dirResource.toString(), equalTo("/some/test/resource/path/")); + } + private void createFileWithContent(Path path, String content) throws Exception { try (Writer writer = Channels.newWriter( localFileSystem.create( http://git-wip-us.apache.org/repos/asf/beam/blob/f93a2775/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java index 2ae6b7e..1b0bd9d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.gcp.storage; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; @@ -37,6 +38,7 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.regex.Pattern; +import javax.annotation.Nullable; import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; import org.apache.beam.sdk.io.FileSystem; import org.apache.beam.sdk.io.fs.CreateOptions; @@ -69,8 +71,7 @@ class GcsFileSystem extends FileSystem { List nonGlobs = Lists.newArrayList(); List isGlobBooleans = Lists.newArrayList(); - for (int i = 0; i < gcsPaths.size(); ++i) { - GcsPath path = gcsPaths.get(i); + for (GcsPath path : gcsPaths) { if (GcsUtil.isGlob(path)) { globs.add(path); isGlobBooleans.add(true); @@ -123,6 +124,22 @@ class GcsFileSystem extends FileSystem { } @Override + protected GcsResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) { + if (isDirectory) { + if (!singleResourceSpec.endsWith("/")) { + singleResourceSpec += '/'; + } + } else { + checkArgument( + !singleResourceSpec.endsWith("/"), + "Expected a file path, but [%s], ends with '/'. This is unsupported in GcsFileSystem.", + singleResourceSpec); + } + GcsPath path = GcsPath.fromUri(singleResourceSpec); + return GcsResourceId.fromGcsPath(path); + } + + @Override protected void copy(List srcResourceIds, List destResourceIds) throws IOException { options.getGcsUtil().copy(toFilenames(srcResourceIds), toFilenames(destResourceIds)); @@ -196,13 +213,15 @@ class GcsFileSystem extends FileSystem { } private MatchResult toMatchResult(StorageObjectOrIOException objectOrException) { - if (objectOrException.ioException() instanceof FileNotFoundException) { - return MatchResult.create(Status.NOT_FOUND, objectOrException.ioException()); - } else if (objectOrException.ioException() != null) { - return MatchResult.create(Status.ERROR, objectOrException.ioException()); + @Nullable IOException exception = objectOrException.ioException(); + if (exception instanceof FileNotFoundException) { + return MatchResult.create(Status.NOT_FOUND, exception); + } else if (exception != null) { + return MatchResult.create(Status.ERROR, exception); } else { - return MatchResult.create( - Status.OK, new Metadata[]{toMetadata(objectOrException.storageObject())}); + StorageObject object = objectOrException.storageObject(); + assert object != null; // fix a warning; guaranteed by StorageObjectOrIOException semantics. + return MatchResult.create(Status.OK, new Metadata[]{toMetadata(object)}); } } http://git-wip-us.apache.org/repos/asf/beam/blob/f93a2775/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java index f4e35ac..ca56a60 100644 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java +++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java @@ -68,4 +68,9 @@ class HadoopFileSystem extends FileSystem { protected void delete(Collection resourceIds) throws IOException { throw new UnsupportedOperationException(); } + + @Override + protected HadoopResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) { + throw new UnsupportedOperationException(); + } }