beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [1/2] beam git commit: [BEAM-2005] Move getScheme from FileSystemRegistrar to FileSystem
Date Fri, 28 Apr 2017 16:00:38 GMT
Repository: beam
Updated Branches:
  refs/heads/master 849e39dfc -> ddabab3fa


[BEAM-2005] Move getScheme from FileSystemRegistrar to FileSystem

Note that I needed to update FileSystems to instantiate the FileSystem(s) upfront instead
of remembering the mapping from scheme to registrar.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ce88c88b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ce88c88b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ce88c88b

Branch: refs/heads/master
Commit: ce88c88b14e963ac17fac83dd19495c835c1f6cb
Parents: 849e39d
Author: Luke Cwik <lcwik@google.com>
Authored: Thu Apr 27 18:13:43 2017 -0700
Committer: Luke Cwik <lcwik@google.com>
Committed: Fri Apr 28 08:59:59 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/FileSystem.java |  7 ++
 .../apache/beam/sdk/io/FileSystemRegistrar.java | 15 +---
 .../org/apache/beam/sdk/io/FileSystems.java     | 92 +++++++++-----------
 .../org/apache/beam/sdk/io/LocalFileSystem.java |  5 ++
 .../beam/sdk/io/LocalFileSystemRegistrar.java   | 15 +---
 .../org/apache/beam/sdk/io/LocalResourceId.java |  2 +-
 .../org/apache/beam/sdk/io/FileSystemsTest.java | 20 +----
 .../sdk/io/LocalFileSystemRegistrarTest.java    |  6 ++
 .../beam/sdk/io/gcp/storage/GcsFileSystem.java  |  5 ++
 .../io/gcp/storage/GcsFileSystemRegistrar.java  | 12 +--
 .../beam/sdk/io/gcp/storage/GcsResourceId.java  |  2 +-
 .../gcp/storage/GcsFileSystemRegistrarTest.java | 11 +--
 .../beam/sdk/io/hdfs/HadoopFileSystem.java      |  5 ++
 .../sdk/io/hdfs/HadoopFileSystemRegistrar.java  | 11 +--
 .../io/hdfs/HadoopFileSystemRegistrarTest.java  | 12 +--
 15 files changed, 100 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ce88c88b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java
index 76c5dc1..375264a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java
@@ -151,4 +151,11 @@ public abstract class FileSystem<ResourceIdT extends ResourceId>
{
    * such as when the specified {@code singleResourceSpec} is not a valid resource name.
    */
   protected abstract ResourceIdT matchNewResource(String singleResourceSpec, boolean isDirectory);
+
+  /**
+   * Get the URI scheme which defines the namespace of the {@link FileSystem}.
+   *
+   * @see <a href="https://www.ietf.org/rfc/rfc2396.txt">RFC 2396</a>
+   */
+  protected abstract String getScheme();
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/ce88c88b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java
index 1d81c1e..78a91f6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java
@@ -33,17 +33,10 @@ import org.apache.beam.sdk.options.PipelineOptions;
  */
 public interface FileSystemRegistrar {
   /**
-   * Create a {@link FileSystem} from the given {@link PipelineOptions}.
-   */
-  FileSystem fromOptions(@Nullable PipelineOptions options);
-
-  /**
-   * Get the URI scheme which defines the namespace of the {@link FileSystemRegistrar}.
-   *
-   * <p>The scheme is required to be unique among all
-   * {@link FileSystemRegistrar FileSystemRegistrars}.
+   * Create zero or more {@link FileSystem filesystems} from the given {@link PipelineOptions}.
    *
-   * @see <a href="https://www.ietf.org/rfc/rfc2396.txt">RFC 2396</a>
+   * <p>Each {@link FileSystem#getScheme() scheme} is required to be unique among all
+   * {@link FileSystem}s registered by all {@link FileSystemRegistrar}s.
    */
-  String getScheme();
+  Iterable<FileSystem> fromOptions(@Nullable PipelineOptions options);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/ce88c88b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
index b290498..532a42f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
@@ -25,6 +25,7 @@ import com.google.common.base.Function;
 import com.google.common.base.Joiner;
 import com.google.common.base.Predicate;
 import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
@@ -43,7 +44,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.ServiceLoader;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import javax.annotation.Nonnull;
@@ -67,14 +68,8 @@ public class FileSystems {
   private static final Pattern URI_SCHEME_PATTERN = Pattern.compile(
       "(?<scheme>[a-zA-Z][-a-zA-Z0-9+.]*)://.*");
 
-  private static final Map<String, FileSystemRegistrar> SCHEME_TO_REGISTRAR =
-      new ConcurrentHashMap<>();
-
-  private static PipelineOptions defaultConfig;
-
-  static {
-    loadFileSystemRegistrars();
-  }
+  private static final AtomicReference<Map<String, FileSystem>> SCHEME_TO_FILESYSTEM
=
+      new AtomicReference<>();
 
   /********************************** METHODS FOR CLIENT **********************************/
 
@@ -402,88 +397,81 @@ public class FileSystems {
     Matcher matcher = URI_SCHEME_PATTERN.matcher(spec);
 
     if (!matcher.matches()) {
-      return LocalFileSystemRegistrar.LOCAL_FILE_SCHEME;
+      return "file";
     } else {
       return matcher.group("scheme").toLowerCase();
     }
   }
 
   /**
-   * Internal method to get {@link FileSystem} for {@code spec}.
+   * Internal method to get {@link FileSystem} for {@code scheme}.
    */
   @VisibleForTesting
   static FileSystem getFileSystemInternal(String scheme) {
-    return getRegistrarInternal(scheme.toLowerCase()).fromOptions(defaultConfig);
-  }
-
-  /**
-   * Internal method to get {@link FileSystemRegistrar} for {@code scheme}.
-   */
-  @VisibleForTesting
-  static FileSystemRegistrar getRegistrarInternal(String scheme) {
     String lowerCaseScheme = scheme.toLowerCase();
-    if (SCHEME_TO_REGISTRAR.containsKey(lowerCaseScheme)) {
-      return SCHEME_TO_REGISTRAR.get(lowerCaseScheme);
-    } else if (SCHEME_TO_REGISTRAR.containsKey(DEFAULT_SCHEME)) {
-      return SCHEME_TO_REGISTRAR.get(DEFAULT_SCHEME);
-    } else {
-      throw new IllegalStateException("Unable to find registrar for " + scheme);
+    Map<String, FileSystem> schemeToFileSystem = SCHEME_TO_FILESYSTEM.get();
+    FileSystem rval = schemeToFileSystem.get(lowerCaseScheme);
+    if (rval != null) {
+      return rval;
     }
+    rval = schemeToFileSystem.get(DEFAULT_SCHEME);
+    if (rval != null) {
+      return rval;
+    }
+    throw new IllegalStateException("Unable to find registrar for " + scheme);
   }
 
   /********************************** METHODS FOR REGISTRATION **********************************/
 
   /**
-   * Loads available {@link FileSystemRegistrar} services.
+   * Sets the default configuration in workers.
+   *
+   * <p>It will be used in {@link FileSystemRegistrar FileSystemRegistrars} for all
schemes.
    */
-  private static void loadFileSystemRegistrars() {
-    SCHEME_TO_REGISTRAR.clear();
+  public static void setDefaultConfigInWorkers(PipelineOptions options) {
+    checkNotNull(options, "options");
     Set<FileSystemRegistrar> registrars =
         Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE);
     registrars.addAll(Lists.newArrayList(
         ServiceLoader.load(FileSystemRegistrar.class, ReflectHelpers.findClassLoader())));
 
-    verifySchemesAreUnique(registrars);
-
-    for (FileSystemRegistrar registrar : registrars) {
-      SCHEME_TO_REGISTRAR.put(registrar.getScheme().toLowerCase(), registrar);
-    }
-  }
-
-  /**
-   * Sets the default configuration in workers.
-   *
-   * <p>It will be used in {@link FileSystemRegistrar FileSystemRegistrars} for all
schemes.
-   */
-  public static void setDefaultConfigInWorkers(PipelineOptions options) {
-    defaultConfig = checkNotNull(options, "options");
+    SCHEME_TO_FILESYSTEM.set(verifySchemesAreUnique(options, registrars));
   }
 
   @VisibleForTesting
-  static void verifySchemesAreUnique(Set<FileSystemRegistrar> registrars) {
-    Multimap<String, FileSystemRegistrar> registrarsBySchemes =
+  static Map<String, FileSystem> verifySchemesAreUnique(
+      PipelineOptions options, Set<FileSystemRegistrar> registrars) {
+    Multimap<String, FileSystem> fileSystemsBySchemes =
         TreeMultimap.create(Ordering.<String>natural(), Ordering.arbitrary());
 
     for (FileSystemRegistrar registrar : registrars) {
-      registrarsBySchemes.put(registrar.getScheme().toLowerCase(), registrar);
+      for (FileSystem fileSystem : registrar.fromOptions(options)) {
+        fileSystemsBySchemes.put(fileSystem.getScheme(), fileSystem);
+      }
     }
-    for (Entry<String, Collection<FileSystemRegistrar>> entry
-        : registrarsBySchemes.asMap().entrySet()) {
+    for (Entry<String, Collection<FileSystem>> entry
+        : fileSystemsBySchemes.asMap().entrySet()) {
       if (entry.getValue().size() > 1) {
-        String conflictingRegistrars = Joiner.on(", ").join(
+        String conflictingFileSystems = Joiner.on(", ").join(
             FluentIterable.from(entry.getValue())
-                .transform(new Function<FileSystemRegistrar, String>() {
+                .transform(new Function<FileSystem, String>() {
                   @Override
-                  public String apply(@Nonnull FileSystemRegistrar input) {
+                  public String apply(@Nonnull FileSystem input) {
                     return input.getClass().getName();
                   }})
                 .toSortedList(Ordering.<String>natural()));
         throw new IllegalStateException(String.format(
-            "Scheme: [%s] has conflicting registrars: [%s]",
+            "Scheme: [%s] has conflicting filesystems: [%s]",
             entry.getKey(),
-            conflictingRegistrars));
+            conflictingFileSystems));
       }
     }
+
+    ImmutableMap.Builder<String, FileSystem> schemeToFileSystem = ImmutableMap.builder();
+    for (Entry<String, FileSystem> entry : fileSystemsBySchemes.entries()) {
+      schemeToFileSystem.put(entry.getKey(), entry.getValue());
+    }
+    return schemeToFileSystem.build();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/ce88c88b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
index 2d80ae4..235b77d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
@@ -171,6 +171,11 @@ class LocalFileSystem extends FileSystem<LocalResourceId> {
     return LocalResourceId.fromPath(path, isDirectory);
   }
 
+  @Override
+  protected String getScheme() {
+    return "file";
+  }
+
   private MatchResult matchOne(String spec) throws IOException {
     File file = Paths.get(spec).toFile();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ce88c88b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java
index 75a38e8..f182360 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java
@@ -18,24 +18,17 @@
 package org.apache.beam.sdk.io;
 
 import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.options.PipelineOptions;
 
 /**
- * {@link AutoService} registrar for the {@link FileSystem}.
+ * {@link AutoService} registrar for the {@link LocalFileSystem}.
  */
 @AutoService(FileSystemRegistrar.class)
 public class LocalFileSystemRegistrar implements FileSystemRegistrar {
-
-  static final String LOCAL_FILE_SCHEME = "file";
-
-  @Override
-  public FileSystem fromOptions(@Nullable PipelineOptions options) {
-    return new LocalFileSystem();
-  }
-
   @Override
-  public String getScheme() {
-    return LOCAL_FILE_SCHEME;
+  public Iterable<FileSystem> fromOptions(@Nullable PipelineOptions options) {
+    return ImmutableList.<FileSystem>of(new LocalFileSystem());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/ce88c88b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java
index 9aa765b..b67ec46 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java
@@ -113,7 +113,7 @@ class LocalResourceId implements ResourceId {
 
   @Override
   public String getScheme() {
-    return LocalFileSystemRegistrar.LOCAL_FILE_SCHEME;
+    return "file";
   }
 
   Path getPath() {

http://git-wip-us.apache.org/repos/asf/beam/blob/ce88c88b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java
index 8cfa3dc..a75c54d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java
@@ -27,7 +27,6 @@ import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Sets;
 import com.google.common.io.Files;
-
 import java.io.Writer;
 import java.nio.channels.Channels;
 import java.nio.charset.StandardCharsets;
@@ -35,12 +34,10 @@ import java.nio.file.NoSuchFileException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.List;
-import javax.annotation.Nullable;
-
 import org.apache.beam.sdk.io.fs.CreateOptions;
 import org.apache.beam.sdk.io.fs.MoveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
-import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.commons.lang3.SystemUtils;
 import org.junit.Rule;
@@ -77,21 +74,12 @@ public class FileSystemsTest {
   @Test
   public void testVerifySchemesAreUnique() throws Exception {
     thrown.expect(RuntimeException.class);
-    thrown.expectMessage("Scheme: [file] has conflicting registrars");
+    thrown.expectMessage("Scheme: [file] has conflicting filesystems");
     FileSystems.verifySchemesAreUnique(
+        PipelineOptionsFactory.create(),
         Sets.<FileSystemRegistrar>newHashSet(
             new LocalFileSystemRegistrar(),
-            new FileSystemRegistrar() {
-              @Override
-              public FileSystem fromOptions(@Nullable PipelineOptions options) {
-                return null;
-              }
-
-              @Override
-              public String getScheme() {
-                return "FILE";
-              }
-            }));
+            new LocalFileSystemRegistrar()));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/ce88c88b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemRegistrarTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemRegistrarTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemRegistrarTest.java
index e4e8326..0b1729d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemRegistrarTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemRegistrarTest.java
@@ -17,10 +17,14 @@
  */
 package org.apache.beam.sdk.io;
 
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
 import com.google.common.collect.Lists;
 import java.util.ServiceLoader;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -36,6 +40,8 @@ public class LocalFileSystemRegistrarTest {
     for (FileSystemRegistrar registrar
         : Lists.newArrayList(ServiceLoader.load(FileSystemRegistrar.class).iterator())) {
       if (registrar instanceof LocalFileSystemRegistrar) {
+        Iterable<FileSystem> fileSystems = registrar.fromOptions(PipelineOptionsFactory.create());
+        assertThat(fileSystems, contains(instanceOf(LocalFileSystem.class)));
         return;
       }
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/ce88c88b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java
index 1b0bd9d..ff71f3c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java
@@ -145,6 +145,11 @@ class GcsFileSystem extends FileSystem<GcsResourceId> {
     options.getGcsUtil().copy(toFilenames(srcResourceIds), toFilenames(destResourceIds));
   }
 
+  @Override
+  protected String getScheme() {
+    return "gs";
+  }
+
   private List<MatchResult> matchGlobs(List<GcsPath> globs) {
     // TODO: Executes in parallel, address https://issues.apache.org/jira/browse/BEAM-1503.
     return FluentIterable.from(globs)

http://git-wip-us.apache.org/repos/asf/beam/blob/ce88c88b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrar.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrar.java
index 31df0e1..1d4e4ad 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrar.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrar.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.gcp.storage;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
 import javax.annotation.Nonnull;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
 import org.apache.beam.sdk.io.FileSystem;
@@ -32,18 +33,11 @@ import org.apache.beam.sdk.options.PipelineOptions;
 @AutoService(FileSystemRegistrar.class)
 public class GcsFileSystemRegistrar implements FileSystemRegistrar {
 
-  static final String GCS_SCHEME = "gs";
-
   @Override
-  public FileSystem fromOptions(@Nonnull PipelineOptions options) {
+  public Iterable<FileSystem> fromOptions(@Nonnull PipelineOptions options) {
     checkNotNull(
         options,
         "Expect the runner have called FileSystems.setDefaultConfigInWorkers().");
-    return new GcsFileSystem(options.as(GcsOptions.class));
-  }
-
-  @Override
-  public String getScheme() {
-    return GCS_SCHEME;
+    return ImmutableList.<FileSystem>of(new GcsFileSystem(options.as(GcsOptions.class)));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/ce88c88b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java
index a1ac827..38dcaf5 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java
@@ -88,7 +88,7 @@ public class GcsResourceId implements ResourceId {
 
   @Override
   public String getScheme() {
-    return GcsFileSystemRegistrar.GCS_SCHEME;
+    return "gs";
   }
 
   GcsPath getGcsPath() {

http://git-wip-us.apache.org/repos/asf/beam/blob/ce88c88b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrarTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrarTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrarTest.java
index ecac8f6..2fc337a 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrarTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrarTest.java
@@ -17,13 +17,14 @@
  */
 package org.apache.beam.sdk.io.gcp.storage;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
 import com.google.common.collect.Lists;
 import java.util.ServiceLoader;
-
+import org.apache.beam.sdk.io.FileSystem;
 import org.apache.beam.sdk.io.FileSystemRegistrar;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.junit.Test;
@@ -41,8 +42,8 @@ public class GcsFileSystemRegistrarTest {
     for (FileSystemRegistrar registrar
         : Lists.newArrayList(ServiceLoader.load(FileSystemRegistrar.class).iterator())) {
       if (registrar instanceof GcsFileSystemRegistrar) {
-        assertEquals("gs", registrar.getScheme());
-        assertTrue(registrar.fromOptions(PipelineOptionsFactory.create()) instanceof GcsFileSystem);
+        Iterable<FileSystem> fileSystems = registrar.fromOptions(PipelineOptionsFactory.create());
+        assertThat(fileSystems, contains(instanceOf(GcsFileSystem.class)));
         return;
       }
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/ce88c88b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
index ca56a60..a8bdd44 100644
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
@@ -73,4 +73,9 @@ class HadoopFileSystem extends FileSystem<HadoopResourceId> {
   protected HadoopResourceId matchNewResource(String singleResourceSpec, boolean isDirectory)
{
     throw new UnsupportedOperationException();
   }
+
+  @Override
+  protected String getScheme() {
+    return "hdfs";
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/ce88c88b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java
index 1471cb0..cc22f4f 100644
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java
@@ -18,10 +18,10 @@
 package org.apache.beam.sdk.io.hdfs;
 
 import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
 import javax.annotation.Nonnull;
 import org.apache.beam.sdk.io.FileSystem;
 import org.apache.beam.sdk.io.FileSystemRegistrar;
-import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.options.PipelineOptions;
 
 /**
@@ -31,12 +31,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 public class HadoopFileSystemRegistrar implements FileSystemRegistrar {
 
   @Override
-  public FileSystem fromOptions(@Nonnull PipelineOptions options) {
-    return new HadoopFileSystem();
-  }
-
-  @Override
-  public String getScheme() {
-    return FileSystems.DEFAULT_SCHEME;
+  public Iterable<FileSystem> fromOptions(@Nonnull PipelineOptions options) {
+    return ImmutableList.<FileSystem>of(new HadoopFileSystem());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/ce88c88b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java
b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java
index 22a439a..c332af5 100644
--- a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java
+++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java
@@ -17,14 +17,15 @@
  */
 package org.apache.beam.sdk.io.hdfs;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
 import com.google.common.collect.Lists;
 import java.util.ServiceLoader;
+import org.apache.beam.sdk.io.FileSystem;
 import org.apache.beam.sdk.io.FileSystemRegistrar;
-import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -41,9 +42,8 @@ public class HadoopFileSystemRegistrarTest {
     for (FileSystemRegistrar registrar
         : Lists.newArrayList(ServiceLoader.load(FileSystemRegistrar.class).iterator())) {
       if (registrar instanceof HadoopFileSystemRegistrar) {
-        assertEquals(FileSystems.DEFAULT_SCHEME, registrar.getScheme());
-        assertTrue(
-            registrar.fromOptions(PipelineOptionsFactory.create()) instanceof HadoopFileSystem);
+        Iterable<FileSystem> fileSystems = registrar.fromOptions(PipelineOptionsFactory.create());
+        assertThat(fileSystems, contains(instanceOf(HadoopFileSystem.class)));
         return;
       }
     }


Mime
View raw message