beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/2] beam git commit: [BEAM-59] Beam GcsFileSystem: implementation of match().
Date Fri, 17 Feb 2017 06:45:19 GMT
Repository: beam
Updated Branches:
  refs/heads/master 2ca3bf669 -> 9df1da493


[BEAM-59] Beam GcsFileSystem: implementation of match().


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/36e87385
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/36e87385
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/36e87385

Branch: refs/heads/master
Commit: 36e873858b00d1e1136833023646bf1d40c0466c
Parents: 2ca3bf6
Author: Pei He <peihe@google.com>
Authored: Thu Feb 16 11:26:45 2017 -0800
Committer: Dan Halperin <dhalperi@google.com>
Committed: Thu Feb 16 22:45:06 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/util/GcsUtil.java  |  9 ++-
 .../beam/sdk/io/gcp/storage/GcsFileSystem.java  | 62 +++++++++++++++++++-
 .../sdk/io/gcp/storage/GcsFileSystemTest.java   | 50 ++++++++++++++++
 3 files changed, 119 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/36e87385/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
index ea0cf9e..434baf5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
@@ -225,6 +225,13 @@ public class GcsUtil {
     return dst.toString();
   }
 
+  /**
+   * Returns true if the given {@code spec} contains glob.
+   */
+  public static boolean isGlob(GcsPath spec) {
+    return GLOB_PREFIX.matcher(spec.getObject()).matches();
+  }
+
   private GcsUtil(
       Storage storageClient,
       HttpRequestInitializer httpRequestInitializer,
@@ -250,7 +257,7 @@ public class GcsUtil {
     checkArgument(isGcsPatternSupported(gcsPattern.getObject()));
     Pattern p = null;
     String prefix = null;
-    if (!GLOB_PREFIX.matcher(gcsPattern.getObject()).matches()) {
+    if (!isGlob(gcsPattern)) {
       // Not a glob.
       try {
         // Use a get request to fetch the metadata of the object, and ignore the return value.

http://git-wip-us.apache.org/repos/asf/beam/blob/36e87385/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java
index b2a712d..fac1db3 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.io.gcp.storage;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
 
 import com.google.api.services.storage.model.Objects;
 import com.google.api.services.storage.model.StorageObject;
@@ -25,12 +26,14 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.math.BigInteger;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.regex.Pattern;
@@ -60,7 +63,39 @@ class GcsFileSystem extends FileSystem<GcsResourceId> {
 
   @Override
   protected List<MatchResult> match(List<String> specs) throws IOException {
-    throw new UnsupportedOperationException();
+    List<GcsPath> gcsPaths = toGcsPaths(specs);
+
+    List<GcsPath> globs = Lists.newArrayList();
+    List<GcsPath> nonGlobs = Lists.newArrayList();
+    List<Boolean> isGlobBooleans = Lists.newArrayList();
+
+    for (int i = 0; i < gcsPaths.size(); ++i) {
+      GcsPath path = gcsPaths.get(i);
+      if (GcsUtil.isGlob(path)) {
+        globs.add(path);
+        isGlobBooleans.add(true);
+      } else {
+        nonGlobs.add(path);
+        isGlobBooleans.add(false);
+      }
+    }
+
+    Iterator<MatchResult> globsMatchResults = matchGlobs(globs).iterator();
+    Iterator<MatchResult> nonGlobsMatchResults = matchNonGlobs(nonGlobs).iterator();
+
+    ImmutableList.Builder<MatchResult> ret = ImmutableList.builder();
+    for (Boolean isGlob : isGlobBooleans) {
+      if (isGlob) {
+        checkState(globsMatchResults.hasNext(), "Expect globsMatchResults has next.");
+        ret.add(globsMatchResults.next());
+      } else {
+        checkState(nonGlobsMatchResults.hasNext(), "Expect nonGlobsMatchResults has next.");
+        ret.add(nonGlobsMatchResults.next());
+      }
+    }
+    checkState(!globsMatchResults.hasNext(), "Expect no more elements in globsMatchResults.");
+    checkState(!nonGlobsMatchResults.hasNext(), "Expect no more elements in nonGlobsMatchResults.");
+    return ret.build();
   }
 
   @Override
@@ -93,6 +128,21 @@ class GcsFileSystem extends FileSystem<GcsResourceId> {
     options.getGcsUtil().copy(toFilenames(srcResourceIds), toFilenames(destResourceIds));
   }
 
+  private List<MatchResult> matchGlobs(List<GcsPath> globs) {
+    // TODO: Executes in parallel, address https://issues.apache.org/jira/browse/BEAM-1503.
+    return FluentIterable.from(globs)
+        .transform(new Function<GcsPath, MatchResult>() {
+          @Override
+          public MatchResult apply(GcsPath gcsPath) {
+            try {
+              return expand(gcsPath);
+            } catch (IOException e) {
+              return MatchResult.create(Status.ERROR, e);
+            }
+          }})
+        .toList();
+  }
+
   /**
    * Expands a pattern into {@link MatchResult}.
    *
@@ -179,4 +229,14 @@ class GcsFileSystem extends FileSystem<GcsResourceId> {
               }})
         .toList();
   }
+
+  private List<GcsPath> toGcsPaths(Collection<String> specs) {
+    return FluentIterable.from(specs)
+        .transform(new Function<String, GcsPath>() {
+          @Override
+          public GcsPath apply(String spec) {
+            return GcsPath.fromUri(spec);
+          }})
+        .toList();
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/36e87385/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemTest.java
index 8b8a788..b726552 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemTest.java
@@ -74,6 +74,56 @@ public class GcsFileSystemTest {
   }
 
   @Test
+  public void testMatch() throws Exception {
+    Objects modelObjects = new Objects();
+    List<StorageObject> items = new ArrayList<>();
+    // A directory
+    items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/"));
+
+    // Files within the directory
+    items.add(createStorageObject("gs://testbucket/testdirectory/file1name", 1L /* fileSize
*/));
+    items.add(createStorageObject("gs://testbucket/testdirectory/file2name", 2L /* fileSize
*/));
+    items.add(createStorageObject("gs://testbucket/testdirectory/file3name", 3L /* fileSize
*/));
+    items.add(createStorageObject("gs://testbucket/testdirectory/file4name", 4L /* fileSize
*/));
+    items.add(createStorageObject("gs://testbucket/testdirectory/otherfile", 5L /* fileSize
*/));
+    items.add(createStorageObject("gs://testbucket/testdirectory/anotherfile", 6L /* fileSize
*/));
+
+    modelObjects.setItems(items);
+    when(mockGcsUtil.listObjects(eq("testbucket"), anyString(), isNull(String.class)))
+        .thenReturn(modelObjects);
+
+    List<GcsPath> gcsPaths = ImmutableList.of(
+        GcsPath.fromUri("gs://testbucket/testdirectory/non-exist-file"),
+        GcsPath.fromUri("gs://testbucket/testdirectory/otherfile"));
+
+    when(mockGcsUtil.getObjects(eq(gcsPaths))).thenReturn(
+        ImmutableList.of(
+            StorageObjectOrIOException.create(new FileNotFoundException()),
+            StorageObjectOrIOException.create(
+                createStorageObject("gs://testbucket/testdirectory/otherfile", 4L))));
+
+    List<String> specs = ImmutableList.of(
+        "gs://testbucket/testdirectory/file[1-3]*",
+        "gs://testbucket/testdirectory/non-exist-file",
+        "gs://testbucket/testdirectory/otherfile");
+    List<MatchResult> matchResults = gcsFileSystem.match(specs);
+    assertEquals(3, matchResults.size());
+    assertEquals(Status.OK, matchResults.get(0).status());
+    assertThat(
+        ImmutableList.of(
+            "gs://testbucket/testdirectory/file1name",
+            "gs://testbucket/testdirectory/file2name",
+            "gs://testbucket/testdirectory/file3name"),
+        contains(toFilenames(matchResults.get(0)).toArray()));
+    assertEquals(Status.NOT_FOUND, matchResults.get(1).status());
+    assertEquals(Status.OK, matchResults.get(2).status());
+    assertThat(
+        ImmutableList.of("gs://testbucket/testdirectory/otherfile"),
+        contains(toFilenames(matchResults.get(2)).toArray()));
+
+  }
+
+  @Test
   public void testGlobExpansion() throws IOException {
     Objects modelObjects = new Objects();
     List<StorageObject> items = new ArrayList<>();


Mime
View raw message