beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/2] beam git commit: [BEAM-2277] Cherrypick #3121 to release-2.0.0
Date Fri, 12 May 2017 22:04:01 GMT
Repository: beam
Updated Branches:
  refs/heads/release-2.0.0 c807fec9b -> 7b598f878


[BEAM-2277] Cherrypick #3121 to release-2.0.0

fbb0de129d Remove '/' entirely from determining FileSystem scheme
a6a5ff7be3 [BEAM-2277] Add ResourceIdTester and test existing ResourceId implementations
ec956c85ef Mark FileSystem and related as Experimental
15df211c75 [BEAM-2277] HadoopFileSystem: normalize implementation
f3540d47f1 Rename FileSystems.setDefaultConfigInWorkers
3921163829 Fix shading of guava testlib


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e6ee1d8b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e6ee1d8b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e6ee1d8b

Branch: refs/heads/release-2.0.0
Commit: e6ee1d8b98bcee6ee8c6a80b4af6646990e0c399
Parents: c807fec
Author: Dan Halperin <dhalperi@google.com>
Authored: Fri May 12 14:59:14 2017 -0700
Committer: Dan Halperin <dhalperi@google.com>
Committed: Fri May 12 15:03:50 2017 -0700

----------------------------------------------------------------------
 pom.xml                                         |   4 +
 .../utils/SerializablePipelineOptions.java      |   2 +-
 runners/direct-java/pom.xml                     |   4 +
 .../utils/SerializedPipelineOptions.java        |   2 +-
 .../DataflowPipelineTranslatorTest.java         |   2 +-
 .../runners/dataflow/DataflowRunnerTest.java    |   6 +-
 .../options/DataflowPipelineOptionsTest.java    |   4 +-
 .../runners/dataflow/util/PackageUtilTest.java  |   2 +-
 .../spark/translation/SparkRuntimeContext.java  |   2 +-
 sdks/java/core/pom.xml                          |   4 +
 .../org/apache/beam/sdk/PipelineRunner.java     |   2 +-
 .../beam/sdk/annotations/Experimental.java      |   7 +
 .../java/org/apache/beam/sdk/io/AvroIO.java     |   4 +
 .../org/apache/beam/sdk/io/FileBasedSink.java   |  21 ++-
 .../java/org/apache/beam/sdk/io/FileSystem.java |   3 +
 .../apache/beam/sdk/io/FileSystemRegistrar.java |   3 +
 .../org/apache/beam/sdk/io/FileSystems.java     |  21 ++-
 .../beam/sdk/io/LocalFileSystemRegistrar.java   |   3 +
 .../org/apache/beam/sdk/io/LocalResources.java  |   3 +
 .../java/org/apache/beam/sdk/io/TFRecordIO.java |   4 +
 .../java/org/apache/beam/sdk/io/TextIO.java     |   4 +
 .../org/apache/beam/sdk/io/fs/ResourceId.java   |   3 +
 .../apache/beam/sdk/testing/TestPipeline.java   |   2 +-
 .../apache/beam/sdk/io/LocalResourceIdTest.java |   6 +
 .../apache/beam/sdk/io/fs/ResourceIdTester.java | 150 +++++++++++++++++++
 .../google-cloud-platform-core/pom.xml          |   6 +
 .../gcp/storage/GcsFileSystemRegistrar.java     |   5 +-
 .../gcp/storage/GcsResourceIdTest.java          |   9 ++
 sdks/java/io/hadoop-file-system/pom.xml         |  13 ++
 .../beam/sdk/io/hdfs/HadoopFileSystem.java      |  32 ++--
 .../sdk/io/hdfs/HadoopFileSystemOptions.java    |   3 +
 .../sdk/io/hdfs/HadoopFileSystemRegistrar.java  |   3 +
 .../beam/sdk/io/hdfs/HadoopResourceId.java      |  16 +-
 .../beam/sdk/io/hdfs/HadoopFileSystemTest.java  |   5 +-
 .../beam/sdk/io/hdfs/HadoopResourceIdTest.java  |  71 +++++++++
 35 files changed, 395 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3d02096..8f96acc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1446,6 +1446,10 @@
                 <relocations>
                   <relocation>
                     <pattern>com.google.common</pattern>
+                    <excludes>
+                      <!-- com.google.common is too generic, need to exclude guava-testlib -->
+                      <exclude>com.google.common.**.testing.*</exclude>
+                    </excludes>
                     <!--suppress MavenModelInspection -->
                     <shadedPattern>
                       org.apache.${renderedArtifactId}.repackaged.com.google.common

http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
index 02afa7a..46b04fc 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
@@ -62,7 +62,7 @@ public class SerializablePipelineOptions implements Externalizable {
         .as(ApexPipelineOptions.class);
 
     if (FILE_SYSTEMS_INTIIALIZED.compareAndSet(false, true)) {
-      FileSystems.setDefaultConfigInWorkers(pipelineOptions);
+      FileSystems.setDefaultPipelineOptions(pipelineOptions);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index c971ce3..95d560c 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -131,6 +131,10 @@
                 </relocation>
                 <relocation>
                   <pattern>com.google.common</pattern>
+                  <excludes>
+                    <!-- com.google.common is too generic, need to exclude guava-testlib -->
+                    <exclude>com.google.common.**.testing.*</exclude>
+                  </excludes>
                   <shadedPattern>
                     org.apache.beam.runners.direct.repackaged.com.google.common
                   </shadedPattern>

http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
index 84f3bf4..40b6dd6 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
@@ -56,7 +56,7 @@ public class SerializedPipelineOptions implements Serializable {
       try {
         pipelineOptions = createMapper().readValue(serializedOptions, PipelineOptions.class);
 
-        FileSystems.setDefaultConfigInWorkers(pipelineOptions);
+        FileSystems.setDefaultPipelineOptions(pipelineOptions);
       } catch (IOException e) {
         throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e);
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 93c1e5b..87744f0 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -144,7 +144,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     Pipeline p = Pipeline.create(options);
 
     // Enable the FileSystems API to know about gs:// URIs in this test.
-    FileSystems.setDefaultConfigInWorkers(options);
+    FileSystems.setDefaultPipelineOptions(options);
 
     p.apply("ReadMyFile", TextIO.read().from("gs://bucket/object"))
      .apply("WriteMyFile", TextIO.write().to("gs://bucket/object"));

http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index ce01aa1..8f10b18 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -177,7 +177,7 @@ public class DataflowRunnerTest {
         .apply("WriteMyFile", TextIO.write().to("gs://bucket/object"));
 
     // Enable the FileSystems API to know about gs:// URIs in this test.
-    FileSystems.setDefaultConfigInWorkers(options);
+    FileSystems.setDefaultPipelineOptions(options);
 
     return p;
   }
@@ -246,7 +246,7 @@ public class DataflowRunnerTest {
     options.setGcpCredential(new TestCredential());
 
     // Configure the FileSystem registrar to use these options.
-    FileSystems.setDefaultConfigInWorkers(options);
+    FileSystems.setDefaultPipelineOptions(options);
 
     return options;
   }
@@ -771,7 +771,7 @@ public class DataflowRunnerTest {
   @Test
   public void testInvalidNumberOfWorkerHarnessThreads() throws IOException {
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    FileSystems.setDefaultConfigInWorkers(options);
+    FileSystems.setDefaultPipelineOptions(options);
     options.setRunner(DataflowRunner.class);
     options.setProject("foo-12345");
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
index 613604a..cea44f0 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
@@ -137,7 +137,7 @@ public class DataflowPipelineOptionsTest {
   @Test
   public void testDefaultToTempLocation() {
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    FileSystems.setDefaultConfigInWorkers(options);
+    FileSystems.setDefaultPipelineOptions(options);
     options.setPathValidatorClass(NoopPathValidator.class);
     options.setTempLocation("gs://temp_location/");
     assertEquals("gs://temp_location/", options.getGcpTempLocation());
@@ -147,7 +147,7 @@ public class DataflowPipelineOptionsTest {
   @Test
   public void testDefaultToGcpTempLocation() {
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    FileSystems.setDefaultConfigInWorkers(options);
+    FileSystems.setDefaultPipelineOptions(options);
     options.setPathValidatorClass(NoopPathValidator.class);
     options.setTempLocation("gs://temp_location/");
     options.setGcpTempLocation("gs://gcp_temp_location/");

http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
index c7a660e..5d0c0f2 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
@@ -120,7 +120,7 @@ public class PackageUtilTest {
 
     GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class);
     pipelineOptions.setGcsUtil(mockGcsUtil);
-    FileSystems.setDefaultConfigInWorkers(pipelineOptions);
+    FileSystems.setDefaultPipelineOptions(pipelineOptions);
     createOptions = StandardCreateOptions.builder().setMimeType(MimeTypes.BINARY).build();
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
index 82e8b44..f3fe99c 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
@@ -90,7 +90,7 @@ public class SparkRuntimeContext implements Serializable {
           }
         }
         // Register standard FileSystems.
-        FileSystems.setDefaultConfigInWorkers(pipelineOptions);
+        FileSystems.setDefaultPipelineOptions(pipelineOptions);
       }
       return pipelineOptions;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index 35d160d..712553d 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -161,6 +161,10 @@
               <relocations>
                 <relocation>
                   <pattern>com.google.common</pattern>
+                  <excludes>
+                    <!-- com.google.common is too generic, need to exclude guava-testlib -->
+                    <exclude>com.google.common.**.testing.*</exclude>
+                  </excludes>
                   <!--suppress MavenModelInspection -->
                   <shadedPattern>
                     org.apache.beam.sdk.repackaged.com.google.common

http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineRunner.java
index 87705af..c114501 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineRunner.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineRunner.java
@@ -41,7 +41,7 @@ public abstract class PipelineRunner<ResultT extends PipelineResult> {
     PipelineOptionsValidator.validate(PipelineOptions.class, options);
 
     // (Re-)register standard FileSystems. Clobbers any prior credentials.
-    FileSystems.setDefaultConfigInWorkers(options);
+    FileSystems.setDefaultPipelineOptions(options);
 
     @SuppressWarnings("unchecked")
     PipelineRunner<? extends PipelineResult> result =

http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
index ac02465..8224ebb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
@@ -23,6 +23,7 @@ import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 
+
 /**
  * Signifies that a public API (public class, method or field) is subject to incompatible changes,
  * or even removal, in a future release.
@@ -79,6 +80,12 @@ public @interface Experimental {
     /** Metrics-related experimental APIs. */
     METRICS,
 
+    /**
+     * {@link org.apache.beam.sdk.io.FileSystem} and {@link org.apache.beam.sdk.io.fs.ResourceId}
+     * related APIs.
+     */
+    FILESYSTEM,
+
     /** Experimental feature related to alternative, unnested encodings for coders. */
     CODER_CONTEXT,
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index d13c6ff..6af0e79 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -30,6 +30,8 @@ import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.reflect.ReflectData;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
@@ -313,6 +315,7 @@ public class AvroIO {
      * a common suffix (if supplied using {@link #withSuffix(String)}). This default can be
      * overridden using {@link #withFilenamePolicy(FilenamePolicy)}.
      */
+    @Experimental(Kind.FILESYSTEM)
     public Write<T> to(ResourceId outputPrefix) {
       return toResource(StaticValueProvider.of(outputPrefix));
     }
@@ -333,6 +336,7 @@ public class AvroIO {
     /**
      * Like {@link #to(ResourceId)}.
      */
+    @Experimental(Kind.FILESYSTEM)
     public Write<T> toResource(ValueProvider<ResourceId> outputPrefix) {
       return toBuilder().setFilenamePrefix(outputPrefix).build();
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 7f729a7..8102316 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -47,6 +47,8 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.zip.GZIPOutputStream;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.NullableCoder;
@@ -115,6 +117,7 @@ import org.slf4j.LoggerFactory;
  *
  * @param <T> the type of values written to the sink.
  */
+@Experimental(Kind.FILESYSTEM)
 public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
   private static final Logger LOG = LoggerFactory.getLogger(FileBasedSink.class);
 
@@ -193,6 +196,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
    * {@code /}, {@code gs://my-bucket}, or {@code c://}. In that case, interpreting the string as a
    * file will fail and this function will return a directory {@link ResourceId} instead.
    */
+  @Experimental(Kind.FILESYSTEM)
   public static ResourceId convertToFileResourceIfPossible(String outputPrefix) {
     try {
       return FileSystems.matchNewResource(outputPrefix, false /* isDirectory */);
@@ -290,6 +294,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
      * as well as sharding information. The policy must return unique and consistent filenames
      * for different windows and panes.
      */
+    @Experimental(Kind.FILESYSTEM)
     public abstract ResourceId windowedFilename(
         ResourceId outputDirectory, WindowedContext c, String extension);
 
@@ -302,6 +307,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
      * <p>The {@link Context} object only provides sharding information, which is used by the policy
      * to generate unique and consistent filenames.
      */
+    @Experimental(Kind.FILESYSTEM)
     @Nullable public abstract ResourceId unwindowedFilename(
         ResourceId outputDirectory, Context c, String extension);
 
@@ -320,6 +326,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
   /**
    * Construct a {@link FileBasedSink} with the given filename policy, producing uncompressed files.
    */
+  @Experimental(Kind.FILESYSTEM)
   public FileBasedSink(
       ValueProvider<ResourceId> baseOutputDirectoryProvider, FilenamePolicy filenamePolicy) {
     this(baseOutputDirectoryProvider, filenamePolicy, CompressionType.UNCOMPRESSED);
@@ -335,6 +342,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
   /**
    * Construct a {@link FileBasedSink} with the given filename policy and output channel type.
    */
+  @Experimental(Kind.FILESYSTEM)
   public FileBasedSink(
       ValueProvider<ResourceId> baseOutputDirectoryProvider,
       FilenamePolicy filenamePolicy,
@@ -349,6 +357,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
    * Returns the base directory inside which files will be written according to the configured
    * {@link FilenamePolicy}.
    */
+  @Experimental(Kind.FILESYSTEM)
   public ValueProvider<ResourceId> getBaseOutputDirectoryProvider() {
     return baseOutputDirectoryProvider;
   }
@@ -358,6 +367,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
    * the {@link FilenamePolicy} may itself specify one or more inner directories before each output
    * file, say when writing windowed outputs in a {@code output/YYYY/MM/DD/file.txt} format.
    */
+  @Experimental(Kind.FILESYSTEM)
   public final FilenamePolicy getFilenamePolicy() {
     return filenamePolicy;
   }
@@ -424,9 +434,11 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
     protected final ValueProvider<ResourceId> tempDirectory;
 
     /** Whether windowed writes are being used. */
-    protected  boolean windowedWrites;
+    @Experimental(Kind.FILESYSTEM)
+    protected boolean windowedWrites;
 
     /** Constructs a temporary file resource given the temporary directory and a filename. */
+    @Experimental(Kind.FILESYSTEM)
     protected static ResourceId buildTemporaryFilename(ResourceId tempDirectory, String filename)
         throws IOException {
       return tempDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
@@ -472,6 +484,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
      * @param sink the FileBasedSink that will be used to configure this write operation.
      * @param tempDirectory the base directory to be used for temporary output files.
      */
+    @Experimental(Kind.FILESYSTEM)
     public WriteOperation(FileBasedSink<T> sink, ResourceId tempDirectory) {
       this(sink, StaticValueProvider.of(tempDirectory));
     }
@@ -527,6 +540,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
       removeTemporaryFiles(outputFilenames.keySet(), !windowedWrites);
     }
 
+    @Experimental(Kind.FILESYSTEM)
     protected final Map<ResourceId, ResourceId> buildOutputFilenames(
         Iterable<FileResult> writerResults) {
       int numShards = Iterables.size(writerResults);
@@ -610,6 +624,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
      * @param filenames the filenames of temporary files.
      */
     @VisibleForTesting
+    @Experimental(Kind.FILESYSTEM)
     final void copyToOutputFiles(Map<ResourceId, ResourceId> filenames)
         throws IOException {
       int numFiles = filenames.size();
@@ -637,6 +652,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
      * temporary files, this method will remove them.
      */
     @VisibleForTesting
+    @Experimental(Kind.FILESYSTEM)
     final void removeTemporaryFiles(
         Set<ResourceId> knownFiles, boolean shouldRemoveTemporaryDirectory) throws IOException {
       ResourceId tempDir = tempDirectory.get();
@@ -945,6 +961,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
     private final BoundedWindow window;
     private final PaneInfo paneInfo;
 
+    @Experimental(Kind.FILESYSTEM)
     public FileResult(ResourceId tempFilename, int shard, BoundedWindow window, PaneInfo paneInfo) {
       this.tempFilename = tempFilename;
       this.shard = shard;
@@ -952,6 +969,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
       this.paneInfo = paneInfo;
     }
 
+    @Experimental(Kind.FILESYSTEM)
     public ResourceId getTempFilename() {
       return tempFilename;
     }
@@ -972,6 +990,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
       return paneInfo;
     }
 
+    @Experimental(Kind.FILESYSTEM)
     public ResourceId getDestinationFile(FilenamePolicy policy, ResourceId outputDirectory,
                                          int numShards, String extension) {
       checkArgument(getShard() != UNKNOWN_SHARDNUM);

http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java
index 375264a..601feca 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java
@@ -23,6 +23,8 @@ import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
 import java.util.Collection;
 import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.io.fs.CreateOptions;
 import org.apache.beam.sdk.io.fs.MatchResult;
 import org.apache.beam.sdk.io.fs.ResourceId;
@@ -35,6 +37,7 @@ import org.apache.beam.sdk.io.fs.ResourceId;
  * <p>All methods are protected, and they are for file system providers to implement.
  * Clients should use {@link FileSystems} utility.
  */
+@Experimental(Kind.FILESYSTEM)
 public abstract class FileSystem<ResourceIdT extends ResourceId> {
   /**
    * This is the entry point to convert user-provided specs to {@link ResourceIdT ResourceIds}.

http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java
index 78a91f6..50ee6eb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java
@@ -20,6 +20,8 @@ package org.apache.beam.sdk.io;
 import com.google.auto.service.AutoService;
 import java.util.ServiceLoader;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.options.PipelineOptions;
 
 /**
@@ -31,6 +33,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
  * <p>It is optional but recommended to use one of the many build time tools such as
  * {@link AutoService} to generate the necessary META-INF files automatically.
  */
+@Experimental(Kind.FILESYSTEM)
 public interface FileSystemRegistrar {
   /**
    * Create zero or more {@link FileSystem filesystems} from the given {@link PipelineOptions}.

http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
index 2e11177..1aacc90 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
@@ -49,6 +49,9 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import javax.annotation.Nonnull;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.io.fs.CreateOptions;
 import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions;
 import org.apache.beam.sdk.io.fs.MatchResult;
@@ -63,11 +66,12 @@ import org.apache.beam.sdk.values.KV;
 /**
  * Clients facing {@link FileSystem} utility.
  */
+@Experimental(Kind.FILESYSTEM)
 public class FileSystems {
 
   public static final String DEFAULT_SCHEME = "default";
-  private static final Pattern URI_SCHEME_PATTERN = Pattern.compile(
-      "(?<scheme>[a-zA-Z][-a-zA-Z0-9+.]*)://.*");
+  private static final Pattern FILE_SCHEME_PATTERN =
+      Pattern.compile("(?<scheme>[a-zA-Z][-a-zA-Z0-9+.]*):.*");
 
   private static final AtomicReference<Map<String, FileSystem>> SCHEME_TO_FILESYSTEM =
       new AtomicReference<Map<String, FileSystem>>(
@@ -416,7 +420,7 @@ public class FileSystems {
     // from their use in the URI spec. ('*' is not reserved).
     // Here, we just need the scheme, which is so circumscribed as to be
     // very easy to extract with a regex.
-    Matcher matcher = URI_SCHEME_PATTERN.matcher(spec);
+    Matcher matcher = FILE_SCHEME_PATTERN.matcher(spec);
 
     if (!matcher.matches()) {
       return "file";
@@ -445,12 +449,21 @@ public class FileSystems {
 
   /********************************** METHODS FOR REGISTRATION **********************************/
 
+  /** @deprecated to be removed. */
+  @Deprecated // for DataflowRunner backwards compatibility.
+  public static void setDefaultConfigInWorkers(PipelineOptions options) {
+    setDefaultPipelineOptions(options);
+  }
+
   /**
    * Sets the default configuration in workers.
    *
    * <p>It will be used in {@link FileSystemRegistrar FileSystemRegistrars} for all schemes.
+   *
+   * <p>This is expected only to be used by runners after {@code Pipeline.run}, or in tests.
    */
-  public static void setDefaultConfigInWorkers(PipelineOptions options) {
+  @Internal
+  public static void setDefaultPipelineOptions(PipelineOptions options) {
     checkNotNull(options, "options");
     Set<FileSystemRegistrar> registrars =
         Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE);

http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java
index f182360..7896e20 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java
@@ -20,12 +20,15 @@ package org.apache.beam.sdk.io;
 import com.google.auto.service.AutoService;
 import com.google.common.collect.ImmutableList;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.options.PipelineOptions;
 
 /**
  * {@link AutoService} registrar for the {@link LocalFileSystem}.
  */
 @AutoService(FileSystemRegistrar.class)
+@Experimental(Kind.FILESYSTEM)
 public class LocalFileSystemRegistrar implements FileSystemRegistrar {
   @Override
   public Iterable<FileSystem> fromOptions(@Nullable PipelineOptions options) {

http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResources.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResources.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResources.java
index 817829b..d234bae 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResources.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResources.java
@@ -20,6 +20,8 @@ package org.apache.beam.sdk.io;
 import java.io.File;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
@@ -28,6 +30,7 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
 /**
  * Helper functions for producing a {@link ResourceId} that references a local file or directory.
  */
+@Experimental(Kind.FILESYSTEM)
 public final class LocalResources {
 
   public static ResourceId fromFile(File file, boolean isDirectory) {

http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
index f73d6f3..c274595 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
@@ -31,6 +31,8 @@ import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
 import java.util.NoSuchElementException;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
@@ -277,6 +279,7 @@ public class TFRecordIO {
      *
      * <p>For more information on filenames, see {@link DefaultFilenamePolicy}.
      */
+    @Experimental(Kind.FILESYSTEM)
     public Write to(ResourceId outputResource) {
       return toResource(StaticValueProvider.of(outputResource));
     }
@@ -284,6 +287,7 @@ public class TFRecordIO {
     /**
      * Like {@link #to(ResourceId)}.
      */
+    @Experimental(Kind.FILESYSTEM)
     public Write toResource(ValueProvider<ResourceId> outputResource) {
       return toBuilder().setOutputPrefix(outputResource).build();
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index af6a069..5c068ce 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -23,6 +23,8 @@ import static com.google.common.base.Preconditions.checkState;
 
 import com.google.auto.value.AutoValue;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
@@ -306,6 +308,7 @@ public class TextIO {
      * in which case {@link #withShardNameTemplate(String)} and {@link #withSuffix(String)} should
      * not be set.
      */
+    @Experimental(Kind.FILESYSTEM)
     public Write to(ResourceId filenamePrefix) {
       return toResource(StaticValueProvider.of(filenamePrefix));
     }
@@ -326,6 +329,7 @@ public class TextIO {
     /**
      * Like {@link #to(ResourceId)}.
      */
+    @Experimental(Kind.FILESYSTEM)
     public Write toResource(ValueProvider<ResourceId> filenamePrefix) {
       return toBuilder().setFilenamePrefix(filenamePrefix).build();
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java
index dfe771f..9196034 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java
@@ -19,6 +19,8 @@ package org.apache.beam.sdk.io.fs;
 
 import java.io.Serializable;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.io.FileSystem;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
@@ -45,6 +47,7 @@ import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
  *   to generate {@link ResourceId ResourceIds} for resources that may not yet exist.
  * </ul>
  */
+@Experimental(Kind.FILESYSTEM)
 public interface ResourceId extends Serializable {
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index e04c2f8..9206e04 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -413,7 +413,7 @@ public class TestPipeline extends Pipeline implements TestRule {
       }
       options.setStableUniqueNames(CheckEnabled.ERROR);
 
-      FileSystems.setDefaultConfigInWorkers(options);
+      FileSystems.setDefaultPipelineOptions(options);
       return options;
     } catch (IOException e) {
       throw new RuntimeException(

http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalResourceIdTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalResourceIdTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalResourceIdTest.java
index 7ea85cf..e1ca303 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalResourceIdTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalResourceIdTest.java
@@ -31,6 +31,7 @@ import java.io.File;
 import java.nio.file.Paths;
 import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.io.fs.ResourceIdTester;
 import org.apache.commons.lang3.SystemUtils;
 import org.junit.Rule;
 import org.junit.Test;
@@ -259,6 +260,11 @@ public class LocalResourceIdTest {
         "xyz.txt");
   }
 
+  @Test
+  public void testResourceIdTester() throws Exception {
+    ResourceIdTester.runResourceIdBattery(toResourceIdentifier("/tmp/foo/"));
+  }
+
   private LocalResourceId toResourceIdentifier(String str) throws Exception {
     boolean isDirectory;
     if (SystemUtils.IS_OS_WINDOWS) {

http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/ResourceIdTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/ResourceIdTester.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/ResourceIdTester.java
new file mode 100644
index 0000000..8ceaeed
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/ResourceIdTester.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fs;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY;
+import static org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions.RESOLVE_FILE;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+import com.google.common.testing.EqualsTester;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.io.FileSystems;
+
+/**
+ * A utility to test {@link ResourceId} implementations.
+ */
+@Experimental(Kind.FILESYSTEM)
+public final class ResourceIdTester {
+  /**
+   * Enforces that the {@link ResourceId} implementation of {@code baseDirectory} meets the
+   * {@link ResourceId} spec.
+   */
+  public static void runResourceIdBattery(ResourceId baseDirectory) {
+    checkArgument(
+        baseDirectory.isDirectory(), "baseDirectory %s is not a directory", baseDirectory);
+
+    List<ResourceId> allResourceIds = new ArrayList<>();
+    allResourceIds.add(baseDirectory);
+
+    // Validate that individual resources meet the fairly restrictive spec we have.
+    validateResourceIds(allResourceIds);
+
+    // Validate operations with resolving child resources.
+    validateResolvingIds(baseDirectory, allResourceIds);
+
+    // Validate safeguards against resolving bad paths.
+    validateFailureResolvingIds(baseDirectory);
+  }
+
+  private static void validateResolvingIds(
+      ResourceId baseDirectory, List<ResourceId> allResourceIds) {
+    ResourceId file1 = baseDirectory.resolve("child1", RESOLVE_FILE);
+    ResourceId file2 = baseDirectory.resolve("child2", RESOLVE_FILE);
+    ResourceId file2a = baseDirectory.resolve("child2", RESOLVE_FILE);
+    allResourceIds.add(file1);
+    allResourceIds.add(file2);
+    assertThat("Resolved file isDirectory()", file1.isDirectory(), is(false));
+    assertThat("Resolved file isDirectory()", file2.isDirectory(), is(false));
+    assertThat("Resolved file isDirectory()", file2a.isDirectory(), is(false));
+
+    ResourceId dir1 = baseDirectory.resolve("child1", RESOLVE_DIRECTORY);
+    ResourceId dir2 = baseDirectory.resolve("child2", RESOLVE_DIRECTORY);
+    ResourceId dir2a = baseDirectory.resolve("child2", RESOLVE_DIRECTORY);
+    assertThat("Resolved directory isDirectory()", dir1.isDirectory(), is(true));
+    assertThat("Resolved directory isDirectory()", dir2.isDirectory(), is(true));
+    assertThat("Resolved directory isDirectory()", dir2a.isDirectory(), is(true));
+    allResourceIds.add(dir1);
+    allResourceIds.add(dir2);
+
+    // ResourceIds in equality groups.
+    new EqualsTester()
+        .addEqualityGroup(file1)
+        .addEqualityGroup(file2, file2a)
+        .addEqualityGroup(dir1, dir1.getCurrentDirectory())
+        .addEqualityGroup(dir2, dir2a, dir2.getCurrentDirectory())
+        .addEqualityGroup(baseDirectory, file1.getCurrentDirectory(), file2.getCurrentDirectory())
+        .testEquals();
+
+    // ResourceId toString() in equality groups.
+    new EqualsTester()
+        .addEqualityGroup(file1.toString())
+        .addEqualityGroup(file2.toString(), file2a.toString())
+        .addEqualityGroup(dir1.toString(), dir1.getCurrentDirectory().toString())
+        .addEqualityGroup(dir2.toString(), dir2a.toString(), dir2.getCurrentDirectory().toString())
+        .addEqualityGroup(
+            baseDirectory.toString(),
+            file1.getCurrentDirectory().toString(),
+            file2.getCurrentDirectory().toString())
+        .testEquals();
+
+    // TODO: test resolving strings that need to be escaped.
+    //   Possible spec: https://tools.ietf.org/html/rfc3986#section-2
+    //   May need options to be filesystem-independent, e.g., if filesystems ban certain chars.
+  }
+
+  private static void validateFailureResolvingIds(ResourceId baseDirectory) {
+    try {
+      ResourceId badFile = baseDirectory.resolve("file/", RESOLVE_FILE);
+      fail(String.format("Resolving badFile %s should have failed", badFile));
+    } catch (Throwable t) {
+      // expected
+    }
+
+    ResourceId file = baseDirectory.resolve("file", RESOLVE_FILE);
+    try {
+      baseDirectory.resolve("file2", RESOLVE_FILE);
+      fail(String.format("Should not be able to resolve against file resource %s", file));
+    } catch (Throwable t) {
+      // expected
+    }
+  }
+
+  private static void validateResourceIds(List<ResourceId> resourceIds) {
+    for (ResourceId resourceId : resourceIds) {
+      // ResourceIds should equal themselves.
+      assertThat("ResourceId equal to itself", resourceId, equalTo(resourceId));
+
+      // ResourceIds should be clonable via FileSystems#matchNewResource.
+      ResourceId cloned;
+      if (resourceId.isDirectory()) {
+        cloned = FileSystems.matchNewResource(resourceId.toString(), true /* isDirectory */);
+      } else {
+        cloned = FileSystems.matchNewResource(resourceId.toString(), false /* isDirectory */);
+      }
+      assertThat(
+          "ResourceId equals clone of itself", cloned, equalTo(resourceId));
+      // .. and clones have consistent toString.
+      assertThat(
+          "ResourceId toString consistency", cloned.toString(), equalTo(resourceId.toString()));
+      // .. and have consistent isDirectory.
+      assertThat(
+          "ResourceId isDirectory consistency",
+          cloned.isDirectory(),
+          equalTo(resourceId.isDirectory()));
+    }
+  }
+
+  private ResourceIdTester() {} // prevent instantiation
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/extensions/google-cloud-platform-core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/pom.xml b/sdks/java/extensions/google-cloud-platform-core/pom.xml
index 5f187c4..891c476 100644
--- a/sdks/java/extensions/google-cloud-platform-core/pom.xml
+++ b/sdks/java/extensions/google-cloud-platform-core/pom.xml
@@ -152,6 +152,12 @@
 
     <!-- test dependencies -->
     <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava-testlib</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.beam</groupId>
       <artifactId>beam-sdks-java-core</artifactId>
       <classifier>tests</classifier>

http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java
index 9f5980a..f3a67b2 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java
@@ -22,6 +22,8 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import com.google.auto.service.AutoService;
 import com.google.common.collect.ImmutableList;
 import javax.annotation.Nonnull;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
 import org.apache.beam.sdk.io.FileSystem;
 import org.apache.beam.sdk.io.FileSystemRegistrar;
@@ -31,13 +33,14 @@ import org.apache.beam.sdk.options.PipelineOptions;
  * {@link AutoService} registrar for the {@link GcsFileSystem}.
  */
 @AutoService(FileSystemRegistrar.class)
+@Experimental(Kind.FILESYSTEM)
 public class GcsFileSystemRegistrar implements FileSystemRegistrar {
 
   @Override
   public Iterable<FileSystem> fromOptions(@Nonnull PipelineOptions options) {
     checkNotNull(
         options,
-        "Expect the runner have called FileSystems.setDefaultConfigInWorkers().");
+        "Expect the runner have called FileSystems.setDefaultPipelineOptions().");
     return ImmutableList.<FileSystem>of(new GcsFileSystem(options.as(GcsOptions.class)));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceIdTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceIdTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceIdTest.java
index b245610..c1e214e 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceIdTest.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceIdTest.java
@@ -22,8 +22,11 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
+import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.io.fs.ResourceIdTester;
+import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
 import org.junit.Rule;
 import org.junit.Test;
@@ -163,6 +166,12 @@ public class GcsResourceIdTest {
         "xyz.txt");
   }
 
+  @Test
+  public void testResourceIdTester() throws Exception {
+    FileSystems.setDefaultPipelineOptions(TestPipeline.testingPipelineOptions());
+    ResourceIdTester.runResourceIdBattery(toResourceIdentifier("gs://bucket/foo/"));
+  }
+
   private GcsResourceId toResourceIdentifier(String str) throws Exception {
     return GcsResourceId.fromGcsPath(GcsPath.fromUri(str));
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/io/hadoop-file-system/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/pom.xml b/sdks/java/io/hadoop-file-system/pom.xml
index 46f3e32..b90ccf0 100644
--- a/sdks/java/io/hadoop-file-system/pom.xml
+++ b/sdks/java/io/hadoop-file-system/pom.xml
@@ -157,6 +157,19 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava-testlib</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
       <groupId>org.hamcrest</groupId>
       <artifactId>hamcrest-all</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
index 154a818..d519a8c 100644
--- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
@@ -21,7 +21,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import java.io.IOException;
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
@@ -82,8 +81,9 @@ class HadoopFileSystem extends FileSystem<HadoopResourceId> {
         List<Metadata> metadata = new ArrayList<>();
         for (FileStatus fileStatus : fileStatuses) {
           if (fileStatus.isFile()) {
+            URI uri = dropEmptyAuthority(fileStatus.getPath().toUri().toString());
             metadata.add(Metadata.builder()
-                .setResourceId(new HadoopResourceId(fileStatus.getPath().toUri()))
+                .setResourceId(new HadoopResourceId(uri))
                 .setIsReadSeekEfficient(true)
                 .setSizeBytes(fileStatus.getLen())
                 .build());
@@ -151,19 +151,13 @@ class HadoopFileSystem extends FileSystem<HadoopResourceId> {
 
   @Override
   protected HadoopResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) {
-    try {
-      if (singleResourceSpec.endsWith("/") && !isDirectory) {
-        throw new IllegalArgumentException(String.format(
-            "Expected file path but received directory path %s", singleResourceSpec));
-      }
-      return !singleResourceSpec.endsWith("/") && isDirectory
-          ? new HadoopResourceId(new URI(singleResourceSpec + "/"))
-          : new HadoopResourceId(new URI(singleResourceSpec));
-    } catch (URISyntaxException e) {
-      throw new IllegalArgumentException(
-          String.format("Invalid spec %s directory %s", singleResourceSpec, isDirectory),
-          e);
+    if (singleResourceSpec.endsWith("/") && !isDirectory) {
+      throw new IllegalArgumentException(String.format(
+          "Expected file path but received directory path %s", singleResourceSpec));
     }
+    return !singleResourceSpec.endsWith("/") && isDirectory
+        ? new HadoopResourceId(dropEmptyAuthority(singleResourceSpec + "/"))
+        : new HadoopResourceId(dropEmptyAuthority(singleResourceSpec));
   }
 
   @Override
@@ -237,4 +231,14 @@ class HadoopFileSystem extends FileSystem<HadoopResourceId> {
       inputStream.close();
     }
   }
+
+  private static URI dropEmptyAuthority(String uriStr) {
+    URI uri = URI.create(uriStr);
+    String prefix = uri.getScheme() + ":///";
+    if (uriStr.startsWith(prefix)) {
+      return URI.create(uri.getScheme() + ":/" + uriStr.substring(prefix.length()));
+    } else {
+      return uri;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java
index 45f43e2..f5183d5 100644
--- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java
@@ -25,6 +25,8 @@ import java.io.File;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.Description;
@@ -38,6 +40,7 @@ import org.slf4j.LoggerFactory;
  * {@link PipelineOptions} which encapsulate {@link Configuration Hadoop Configuration}
  * for the {@link HadoopFileSystem}.
  */
+@Experimental(Kind.FILESYSTEM)
 public interface HadoopFileSystemOptions extends PipelineOptions {
   @Description("A list of Hadoop configurations used to configure zero or more Hadoop filesystems. "
       + "By default, Hadoop configuration is loaded from 'core-site.xml' and 'hdfs-site.xml' "

http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java
index 9159df3..8c57089 100644
--- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java
@@ -25,6 +25,8 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import javax.annotation.Nonnull;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.io.FileSystem;
 import org.apache.beam.sdk.io.FileSystemRegistrar;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -34,6 +36,7 @@ import org.apache.hadoop.conf.Configuration;
  * {@link AutoService} registrar for the {@link HadoopFileSystem}.
  */
 @AutoService(FileSystemRegistrar.class)
+@Experimental(Kind.FILESYSTEM)
 public class HadoopFileSystemRegistrar implements FileSystemRegistrar {
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
index e570864..88fa32a 100644
--- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
@@ -17,9 +17,12 @@
  */
 package org.apache.beam.sdk.io.hdfs;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import java.net.URI;
 import java.util.Objects;
 import org.apache.beam.sdk.io.fs.ResolveOptions;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.hadoop.fs.Path;
 
@@ -35,7 +38,18 @@ class HadoopResourceId implements ResourceId {
 
   @Override
   public ResourceId resolve(String other, ResolveOptions resolveOptions) {
-    return new HadoopResourceId(uri.resolve(other));
+    if (resolveOptions == StandardResolveOptions.RESOLVE_DIRECTORY) {
+      if (!other.endsWith("/")) {
+        other += '/';
+      }
+      return new HadoopResourceId(uri.resolve(other));
+    } else if (resolveOptions == StandardResolveOptions.RESOLVE_FILE) {
+      checkArgument(!other.endsWith("/"), "Resolving a file with a directory path: %s", other);
+      return new HadoopResourceId(uri.resolve(other));
+    } else {
+      throw new UnsupportedOperationException(
+          String.format("Unexpected StandardResolveOptions %s", resolveOptions));
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
index cf86c36..88275f4 100644
--- a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
+++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
@@ -63,14 +63,13 @@ public class HadoopFileSystemTest {
   @Rule public TestPipeline p = TestPipeline.create();
   @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
   @Rule public ExpectedException thrown = ExpectedException.none();
-  private Configuration configuration;
   private MiniDFSCluster hdfsCluster;
   private URI hdfsClusterBaseUri;
   private HadoopFileSystem fileSystem;
 
   @Before
   public void setUp() throws Exception {
-    configuration = new Configuration();
+    Configuration configuration = new Configuration();
     configuration.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.getRoot().getAbsolutePath());
     MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(configuration);
     hdfsCluster = builder.build();
@@ -220,7 +219,7 @@ public class HadoopFileSystemTest {
     HadoopFileSystemOptions options = TestPipeline.testingPipelineOptions()
         .as(HadoopFileSystemOptions.class);
     options.setHdfsConfiguration(ImmutableList.of(fileSystem.fileSystem.getConf()));
-    FileSystems.setDefaultConfigInWorkers(options);
+    FileSystems.setDefaultPipelineOptions(options);
     PCollection<String> pc = p.apply(
         TextIO.read().from(testPath("testFile*").toString()));
     PAssert.that(pc).containsInAnyOrder("testDataA", "testDataB", "testDataC");

http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java
new file mode 100644
index 0000000..c4a8577
--- /dev/null
+++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import java.net.URI;
+import java.util.Collections;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.io.fs.ResourceIdTester;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Tests for {@link HadoopResourceId}.
+ */
+public class HadoopResourceIdTest {
+
+  private MiniDFSCluster hdfsCluster;
+  private URI hdfsClusterBaseUri;
+
+  @Rule
+  public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+  @Before
+  public void setUp() throws Exception {
+    Configuration configuration = new Configuration();
+    configuration.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.getRoot().getAbsolutePath());
+    MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(configuration);
+    hdfsCluster = builder.build();
+    hdfsClusterBaseUri = new URI(configuration.get("fs.defaultFS") + "/");
+
+    // Register HadoopFileSystem for this test.
+    HadoopFileSystemOptions options = PipelineOptionsFactory.as(HadoopFileSystemOptions.class);
+    options.setHdfsConfiguration(Collections.singletonList(configuration));
+    FileSystems.setDefaultPipelineOptions(options);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    hdfsCluster.shutdown();
+  }
+
+  @Test
+  public void testResourceIdTester() throws Exception {
+    ResourceId baseDirectory =
+        FileSystems.matchNewResource(
+            "hdfs://" + hdfsClusterBaseUri.getPath(), true /* isDirectory */);
+    ResourceIdTester.runResourceIdBattery(baseDirectory);
+  }
+}


Mime
View raw message