Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 874DE200CE9 for ; Sat, 5 Aug 2017 01:44:17 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 85A7E16E6DE; Fri, 4 Aug 2017 23:44:17 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5CA1816E6CE for ; Sat, 5 Aug 2017 01:44:16 +0200 (CEST) Received: (qmail 29454 invoked by uid 500); 4 Aug 2017 23:44:15 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 29435 invoked by uid 99); 4 Aug 2017 23:44:15 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Aug 2017 23:44:15 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7270FF552D; Fri, 4 Aug 2017 23:44:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jkff@apache.org To: commits@beam.apache.org Date: Fri, 04 Aug 2017 23:44:16 -0000 Message-Id: <4bede05b5676440d84b8273eff561c50@git.apache.org> In-Reply-To: <6fa89d0757e9437385954b030f5bd771@git.apache.org> References: <6fa89d0757e9437385954b030f5bd771@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/4] beam git commit: Introduces EmptyMatchTreatment parameter to FileSystems.match() archived-at: Fri, 04 Aug 2017 23:44:17 -0000 Introduces EmptyMatchTreatment parameter to FileSystems.match() Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/db9aede2 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/db9aede2 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/db9aede2 Branch: refs/heads/master Commit: db9aede289f8546bb30113353f07aa75daa83eba Parents: 5e43b23 Author: Eugene Kirpichov Authored: Thu Aug 3 14:43:48 2017 -0700 Committer: Eugene Kirpichov Committed: Fri Aug 4 16:38:23 2017 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/io/FileBasedSource.java | 52 ++++++++++---------- .../org/apache/beam/sdk/io/FileSystems.java | 46 +++++++++++++++++ .../java/org/apache/beam/sdk/io/TextSource.java | 7 ++- .../beam/sdk/io/fs/EmptyMatchTreatment.java | 46 +++++++++++++++++ .../org/apache/beam/sdk/io/fs/MatchResult.java | 5 +- .../apache/beam/sdk/io/FileBasedSourceTest.java | 51 +++++++++++++++++++ 6 files changed, 180 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/db9aede2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java index d4413c9..7f865de 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java @@ -23,19 +23,17 @@ import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; import java.io.IOException; import java.nio.channels.ReadableByteChannel; import java.nio.channels.SeekableByteChannel; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.ListIterator; import java.util.NoSuchElementException; import javax.annotation.Nullable; +import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; -import org.apache.beam.sdk.io.fs.MatchResult.Status; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; @@ -68,6 +66,7 @@ public abstract class FileBasedSource extends OffsetBasedSource { private static final Logger LOG = LoggerFactory.getLogger(FileBasedSource.class); private final ValueProvider fileOrPatternSpec; + private final EmptyMatchTreatment emptyMatchTreatment; @Nullable private MatchResult.Metadata singleFileMetadata; private final Mode mode; @@ -80,15 +79,28 @@ public abstract class FileBasedSource extends OffsetBasedSource { } /** - * Create a {@code FileBaseSource} based on a file or a file pattern specification. + * Create a {@code FileBaseSource} based on a file or a file pattern specification, with the given + * strategy for treating filepatterns that do not match any files. */ - protected FileBasedSource(ValueProvider fileOrPatternSpec, long minBundleSize) { + protected FileBasedSource( + ValueProvider fileOrPatternSpec, + EmptyMatchTreatment emptyMatchTreatment, + long minBundleSize) { super(0, Long.MAX_VALUE, minBundleSize); - mode = Mode.FILEPATTERN; + this.mode = Mode.FILEPATTERN; + this.emptyMatchTreatment = emptyMatchTreatment; this.fileOrPatternSpec = fileOrPatternSpec; } /** + * Like {@link #FileBasedSource(ValueProvider, EmptyMatchTreatment, long)}, but uses the default + * value of {@link EmptyMatchTreatment#DISALLOW}. + */ + protected FileBasedSource(ValueProvider fileOrPatternSpec, long minBundleSize) { + this(fileOrPatternSpec, EmptyMatchTreatment.DISALLOW, minBundleSize); + } + + /** * Create a {@code FileBasedSource} based on a single file. This constructor must be used when * creating a new {@code FileBasedSource} for a subrange of a single file. * Additionally, this constructor must be used to create new {@code FileBasedSource}s when @@ -110,6 +122,9 @@ public abstract class FileBasedSource extends OffsetBasedSource { mode = Mode.SINGLE_FILE_OR_SUBRANGE; this.singleFileMetadata = checkNotNull(fileMetadata, "fileMetadata"); this.fileOrPatternSpec = StaticValueProvider.of(fileMetadata.resourceId().toString()); + + // This field will be unused in this mode. + this.emptyMatchTreatment = null; } /** @@ -204,14 +219,7 @@ public abstract class FileBasedSource extends OffsetBasedSource { if (mode == Mode.FILEPATTERN) { long totalSize = 0; - List inputs = FileSystems.match(Collections.singletonList(fileOrPattern)); - MatchResult result = Iterables.getOnlyElement(inputs); - checkArgument( - result.status() == Status.OK, - "Error matching the pattern or glob %s: status %s", - fileOrPattern, - result.status()); - List allMatches = result.metadata(); + List allMatches = FileSystems.match(fileOrPattern, emptyMatchTreatment).metadata(); for (Metadata metadata : allMatches) { totalSize += metadata.sizeBytes(); } @@ -254,9 +262,8 @@ public abstract class FileBasedSource extends OffsetBasedSource { if (mode == Mode.FILEPATTERN) { long startTime = System.currentTimeMillis(); - List expandedFiles = FileBasedSource.expandFilePattern(fileOrPattern); - checkArgument(!expandedFiles.isEmpty(), - "Unable to find any files matching %s", fileOrPattern); + List expandedFiles = + FileSystems.match(fileOrPattern, emptyMatchTreatment).metadata(); List> splitResults = new ArrayList<>(expandedFiles.size()); for (Metadata metadata : expandedFiles) { FileBasedSource split = createForSubrangeOfFile(metadata, 0, metadata.sizeBytes()); @@ -327,7 +334,9 @@ public abstract class FileBasedSource extends OffsetBasedSource { if (mode == Mode.FILEPATTERN) { long startTime = System.currentTimeMillis(); - List fileMetadata = FileBasedSource.expandFilePattern(fileOrPattern); + List fileMetadata = + FileSystems.match(fileOrPattern, emptyMatchTreatment).metadata(); + LOG.info("Matched {} files for pattern {}", fileMetadata.size(), fileOrPattern); List> fileReaders = new ArrayList<>(); for (Metadata metadata : fileMetadata) { long endOffset = metadata.sizeBytes(); @@ -389,13 +398,6 @@ public abstract class FileBasedSource extends OffsetBasedSource { return metadata.sizeBytes(); } - private static List expandFilePattern(String fileOrPatternSpec) throws IOException { - MatchResult matches = - Iterables.getOnlyElement(FileSystems.match(Collections.singletonList(fileOrPatternSpec))); - LOG.info("Matched {} files for pattern {}", matches.metadata().size(), fileOrPatternSpec); - return ImmutableList.copyOf(matches.metadata()); - } - /** * A {@link Source.Reader reader} that implements code common to readers of * {@code FileBasedSource}s. http://git-wip-us.apache.org/repos/asf/beam/blob/db9aede2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java index bd4668f..96394b8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java @@ -54,6 +54,7 @@ import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.io.fs.CreateOptions; import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions; +import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.apache.beam.sdk.io.fs.MatchResult.Status; @@ -72,6 +73,8 @@ public class FileSystems { public static final String DEFAULT_SCHEME = "file"; private static final Pattern FILE_SCHEME_PATTERN = Pattern.compile("(?[a-zA-Z][-a-zA-Z0-9+.]*):.*"); + private static final Pattern GLOB_PATTERN = + Pattern.compile("[*?{}]"); private static final AtomicReference> SCHEME_TO_FILESYSTEM = new AtomicReference>( @@ -79,6 +82,11 @@ public class FileSystems { /********************************** METHODS FOR CLIENT **********************************/ + /** Checks whether the given spec contains a glob wildcard character. */ + public static boolean hasGlobWildcard(String spec) { + return GLOB_PATTERN.matcher(spec).find(); + } + /** * This is the entry point to convert user-provided specs to {@link ResourceId ResourceIds}. * Callers should use {@link #match} to resolve users specs ambiguities before @@ -102,6 +110,9 @@ public class FileSystems { *

In case the spec schemes don't match any known {@link FileSystem} implementations, * FileSystems will attempt to use {@link LocalFileSystem} to resolve a path. * + *

Specs that do not match any resources are treated according to + * {@link EmptyMatchTreatment#DISALLOW}. + * * @return {@code List} in the same order of the input specs. * * @throws IllegalArgumentException if specs are invalid -- empty or have different schemes. @@ -114,6 +125,17 @@ public class FileSystems { return getFileSystemInternal(getOnlyScheme(specs)).match(specs); } + /** Like {@link #match(List)}, but with a configurable {@link EmptyMatchTreatment}. */ + public static List match(List specs, EmptyMatchTreatment emptyMatchTreatment) + throws IOException { + List matches = getFileSystemInternal(getOnlyScheme(specs)).match(specs); + List res = Lists.newArrayListWithExpectedSize(matches.size()); + for (int i = 0; i < matches.size(); i++) { + res.add(maybeAdjustEmptyMatchResult(specs.get(i), matches.get(i), emptyMatchTreatment)); + } + return res; + } + /** * Like {@link #match(List)}, but for a single resource specification. @@ -130,6 +152,30 @@ public class FileSystems { matches); return matches.get(0); } + + /** Like {@link #match(String)}, but with a configurable {@link EmptyMatchTreatment}. */ + public static MatchResult match(String spec, EmptyMatchTreatment emptyMatchTreatment) + throws IOException { + MatchResult res = match(spec); + return maybeAdjustEmptyMatchResult(spec, res, emptyMatchTreatment); + } + + private static MatchResult maybeAdjustEmptyMatchResult( + String spec, MatchResult res, EmptyMatchTreatment emptyMatchTreatment) + throws IOException { + if (res.status() != Status.NOT_FOUND) { + return res; + } + boolean notFoundAllowed = + emptyMatchTreatment == EmptyMatchTreatment.ALLOW + || (FileSystems.hasGlobWildcard(spec) + && emptyMatchTreatment == EmptyMatchTreatment.ALLOW_IF_WILDCARD); + if (notFoundAllowed) { + return MatchResult.create(Status.OK, Collections.emptyList()); + } + return res; + } + /** * Returns the {@link Metadata} for a single file resource. Expects a resource specification * {@code spec} that matches a single result. http://git-wip-us.apache.org/repos/asf/beam/blob/db9aede2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java index 86c73d5..29188dc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java @@ -28,6 +28,7 @@ import java.nio.channels.SeekableByteChannel; import java.util.NoSuchElementException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; @@ -48,7 +49,11 @@ import org.apache.beam.sdk.options.ValueProvider; @VisibleForTesting class TextSource extends FileBasedSource { TextSource(ValueProvider fileSpec) { - super(fileSpec, 1L); + this(fileSpec, EmptyMatchTreatment.DISALLOW); + } + + TextSource(ValueProvider fileSpec, EmptyMatchTreatment emptyMatchTreatment) { + super(fileSpec, emptyMatchTreatment, 1L); } private TextSource(MatchResult.Metadata metadata, long start, long end) { http://git-wip-us.apache.org/repos/asf/beam/blob/db9aede2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/EmptyMatchTreatment.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/EmptyMatchTreatment.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/EmptyMatchTreatment.java new file mode 100644 index 0000000..8e12993 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/EmptyMatchTreatment.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.fs; + +import org.apache.beam.sdk.io.fs.MatchResult.Status; + +/** + * Options for allowing or disallowing filepatterns that match no resources in {@link + * org.apache.beam.sdk.io.FileSystems#match}. + */ +public enum EmptyMatchTreatment { + /** + * Filepatterns matching no resources are allowed. For such a filepattern, {@link + * MatchResult#status} will be {@link Status#OK} and {@link MatchResult#metadata} will return an + * empty list. + */ + ALLOW, + + /** + * Filepatterns matching no resources are disallowed. For such a filepattern, {@link + * MatchResult#status} will be {@link Status#NOT_FOUND} and {@link MatchResult#metadata} will + * throw a {@link java.io.FileNotFoundException}. + */ + DISALLOW, + + /** + * Filepatterns matching no resources are allowed if the filepattern contains a glob wildcard + * character, and disallowed otherwise (i.e. if the filepattern specifies a single file). + */ + ALLOW_IF_WILDCARD +} http://git-wip-us.apache.org/repos/asf/beam/blob/db9aede2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java index 642c049..aa80b96 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java @@ -21,6 +21,7 @@ import com.google.auto.value.AutoValue; import java.io.IOException; import java.io.Serializable; import java.util.List; +import org.apache.beam.sdk.io.FileSystems; /** * The result of {@link org.apache.beam.sdk.io.FileSystem#match}. @@ -78,7 +79,9 @@ public abstract class MatchResult { public abstract Status status(); /** - * {@link Metadata} of matched files. + * {@link Metadata} of matched files. Note that if {@link #status()} is {@link Status#NOT_FOUND}, + * this may either throw a {@link java.io.FileNotFoundException} or return an empty list, + * depending on the {@link EmptyMatchTreatment} used in the {@link FileSystems#match} call. */ public abstract List metadata() throws IOException; http://git-wip-us.apache.org/repos/asf/beam/blob/db9aede2/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java index 1bdb915..ea9e06b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java @@ -47,6 +47,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.FileBasedSource.FileBasedReader; import org.apache.beam.sdk.io.Source.Reader; +import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -94,6 +95,15 @@ public class FileBasedSourceTest { } public TestFileBasedSource( + String fileOrPattern, + EmptyMatchTreatment emptyMatchTreatment, + long minBundleSize, + String splitHeader) { + super(StaticValueProvider.of(fileOrPattern), emptyMatchTreatment, minBundleSize); + this.splitHeader = splitHeader; + } + + public TestFileBasedSource( Metadata fileOrPattern, long minBundleSize, long startOffset, @@ -371,6 +381,47 @@ public class FileBasedSourceTest { } @Test + public void testEmptyFilepatternTreatmentDefaultDisallow() throws IOException { + PipelineOptions options = PipelineOptionsFactory.create(); + TestFileBasedSource source = + new TestFileBasedSource(new File(tempFolder.getRoot(), "doesNotExist").getPath(), 64, null); + thrown.expect(FileNotFoundException.class); + readFromSource(source, options); + } + + @Test + public void testEmptyFilepatternTreatmentAllow() throws IOException { + PipelineOptions options = PipelineOptionsFactory.create(); + TestFileBasedSource source = + new TestFileBasedSource( + new File(tempFolder.getRoot(), "doesNotExist").getPath(), + EmptyMatchTreatment.ALLOW, + 64, + null); + TestFileBasedSource sourceWithWildcard = + new TestFileBasedSource( + new File(tempFolder.getRoot(), "doesNotExist*").getPath(), + EmptyMatchTreatment.ALLOW_IF_WILDCARD, + 64, + null); + assertEquals(0, readFromSource(source, options).size()); + assertEquals(0, readFromSource(sourceWithWildcard, options).size()); + } + + @Test + public void testEmptyFilepatternTreatmentAllowIfWildcard() throws IOException { + PipelineOptions options = PipelineOptionsFactory.create(); + TestFileBasedSource source = + new TestFileBasedSource( + new File(tempFolder.getRoot(), "doesNotExist").getPath(), + EmptyMatchTreatment.ALLOW_IF_WILDCARD, + 64, + null); + thrown.expect(FileNotFoundException.class); + readFromSource(source, options); + } + + @Test public void testCloseUnstartedFilePatternReader() throws IOException { PipelineOptions options = PipelineOptionsFactory.create(); List data1 = createStringDataset(3, 50);