beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [3/4] incubator-beam git commit: [BEAM-59] Use ServiceLoader to register IOChannelFactories in IOChannelUtils.
Date Tue, 22 Nov 2016 14:31:22 GMT
[BEAM-59] Use ServiceLoader to register IOChannelFactories in IOChannelUtils.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fa417f9c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fa417f9c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fa417f9c

Branch: refs/heads/master
Commit: fa417f9c2c671626eba3326e82d47741000ec64d
Parents: cd1a5e7
Author: Pei He <peihe@google.com>
Authored: Mon Oct 31 18:02:49 2016 -0700
Committer: Luke Cwik <lcwik@google.com>
Committed: Tue Nov 22 06:18:55 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   |   2 +-
 .../options/DataflowPipelineOptionsTest.java    |   6 +-
 .../runners/dataflow/util/PackageUtilTest.java  |   2 +-
 .../sdk/options/PipelineOptionsFactory.java     |  32 +----
 .../apache/beam/sdk/runners/PipelineRunner.java |   2 +-
 .../apache/beam/sdk/testing/TestPipeline.java   |   2 +-
 .../beam/sdk/util/FileIOChannelFactory.java     |  10 +-
 .../sdk/util/IOChannelFactoryRegistrar.java     |  11 +-
 .../apache/beam/sdk/util/IOChannelUtils.java    | 133 ++++++++++++++++++-
 .../beam/sdk/util/common/ReflectHelpers.java    |  29 ++++
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |   2 +-
 .../apache/beam/sdk/io/FileBasedSourceTest.java |   2 +-
 .../java/org/apache/beam/sdk/io/TextIOTest.java |   2 +-
 .../sdk/options/PipelineOptionsFactoryTest.java |  34 -----
 .../util/FileIOChannelFactoryRegistrarTest.java |   4 +-
 .../beam/sdk/util/FileIOChannelFactoryTest.java |   2 +-
 .../util/GcsIOChannelFactoryRegistrarTest.java  |   4 +-
 .../beam/sdk/util/IOChannelUtilsTest.java       |  39 ++++++
 .../sdk/util/common/ReflectHelpersTest.java     |  33 +++++
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     |   6 +-
 20 files changed, 259 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 841b13f..36328e9 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -240,7 +240,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob>
{
    */
   public static DataflowRunner fromOptions(PipelineOptions options) {
     // (Re-)register standard IO factories. Clobbers any prior credentials.
-    IOChannelUtils.registerStandardIOFactories(options);
+    IOChannelUtils.registerIOFactoriesAllowOverride(options);
 
     DataflowPipelineOptions dataflowOptions =
         PipelineOptionsValidator.validate(DataflowPipelineOptions.class, options);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
index 202d04b..52082e0 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
@@ -126,7 +126,7 @@ public class DataflowPipelineOptionsTest {
   @Test
   public void testStagingLocation() {
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    IOChannelUtils.registerStandardIOFactories(options);
+    IOChannelUtils.registerIOFactoriesAllowOverride(options);
     options.setTempLocation("file://temp_location");
     options.setStagingLocation("gs://staging_location");
     assertTrue(isNullOrEmpty(options.getGcpTempLocation()));
@@ -136,7 +136,7 @@ public class DataflowPipelineOptionsTest {
   @Test
   public void testDefaultToTempLocation() {
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    IOChannelUtils.registerStandardIOFactories(options);
+    IOChannelUtils.registerIOFactoriesAllowOverride(options);
     options.setPathValidatorClass(NoopPathValidator.class);
     options.setTempLocation("gs://temp_location");
     assertEquals("gs://temp_location", options.getGcpTempLocation());
@@ -146,7 +146,7 @@ public class DataflowPipelineOptionsTest {
   @Test
   public void testDefaultToGcpTempLocation() {
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    IOChannelUtils.registerStandardIOFactories(options);
+    IOChannelUtils.registerIOFactoriesAllowOverride(options);
     options.setPathValidatorClass(NoopPathValidator.class);
     options.setTempLocation("gs://temp_location");
     options.setGcpTempLocation("gs://gcp_temp_location");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
index 02aceef..05a87dd 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
@@ -135,7 +135,7 @@ public class PackageUtilTest {
     GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class);
     pipelineOptions.setGcsUtil(mockGcsUtil);
 
-    IOChannelUtils.registerStandardIOFactories(pipelineOptions);
+    IOChannelUtils.registerIOFactoriesAllowOverride(pipelineOptions);
   }
 
   private File makeFileWithContents(String name, String contents) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
index 304e166..6009867 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
@@ -481,23 +481,6 @@ public class PipelineOptionsFactory {
   /** The width at which options should be output. */
   private static final int TERMINAL_WIDTH = 80;
 
-  /**
-   * Finds the appropriate {@code ClassLoader} to be used by the
-   * {@link ServiceLoader#load} call, which by default would use the context
-   * {@code ClassLoader}, which can be null. The fallback is as follows: context
-   * ClassLoader, class ClassLoader and finaly the system ClassLoader.
-   */
-  static ClassLoader findClassLoader() {
-    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
-    if (classLoader == null) {
-      classLoader = PipelineOptionsFactory.class.getClassLoader();
-    }
-    if (classLoader == null) {
-      classLoader = ClassLoader.getSystemClassLoader();
-    }
-    return classLoader;
-  }
-
   static {
     try {
       IGNORED_METHODS = ImmutableSet.<Method>builder()
@@ -514,10 +497,10 @@ public class PipelineOptionsFactory {
       throw new ExceptionInInitializerError(e);
     }
 
-    CLASS_LOADER = findClassLoader();
+    CLASS_LOADER = ReflectHelpers.findClassLoader();
 
     Set<PipelineRunnerRegistrar> pipelineRunnerRegistrars =
-        Sets.newTreeSet(ObjectsClassComparator.INSTANCE);
+        Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE);
     pipelineRunnerRegistrars.addAll(
         Lists.newArrayList(ServiceLoader.load(PipelineRunnerRegistrar.class, CLASS_LOADER)));
     // Store the list of all available pipeline runners.
@@ -579,7 +562,7 @@ public class PipelineOptionsFactory {
   private static void initializeRegistry() {
     register(PipelineOptions.class);
     Set<PipelineOptionsRegistrar> pipelineOptionsRegistrars =
-        Sets.newTreeSet(ObjectsClassComparator.INSTANCE);
+        Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE);
     pipelineOptionsRegistrars.addAll(
         Lists.newArrayList(ServiceLoader.load(PipelineOptionsRegistrar.class, CLASS_LOADER)));
     for (PipelineOptionsRegistrar registrar : pipelineOptionsRegistrars) {
@@ -1390,15 +1373,6 @@ public class PipelineOptionsFactory {
     }
   }
 
-  /** A {@link Comparator} that uses the object's classes canonical name to compare them.
*/
-  private static class ObjectsClassComparator implements Comparator<Object> {
-    static final ObjectsClassComparator INSTANCE = new ObjectsClassComparator();
-    @Override
-    public int compare(Object o1, Object o2) {
-      return o1.getClass().getCanonicalName().compareTo(o2.getClass().getCanonicalName());
-    }
-  }
-
   /** A {@link Comparator} that uses the generic method signature to sort them. */
   private static class MethodComparator implements Comparator<Method> {
     static final MethodComparator INSTANCE = new MethodComparator();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
index ede1507..77f5128 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
@@ -48,7 +48,7 @@ public abstract class PipelineRunner<ResultT extends PipelineResult>
{
     checkNotNull(options);
 
     // (Re-)register standard IO factories. Clobbers any prior credentials.
-    IOChannelUtils.registerStandardIOFactories(gcsOptions);
+    IOChannelUtils.registerIOFactoriesAllowOverride(gcsOptions);
 
     @SuppressWarnings("unchecked")
     PipelineRunner<? extends PipelineResult> result =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index f1bf09d..493d4cc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -152,7 +152,7 @@ public class TestPipeline extends Pipeline {
       }
       options.setStableUniqueNames(CheckEnabled.ERROR);
 
-      IOChannelUtils.registerStandardIOFactories(options);
+      IOChannelUtils.registerIOFactoriesAllowOverride(options);
       return options;
     } catch (IOException e) {
       throw new RuntimeException("Unable to instantiate test options from system property
"

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
index 13591a3..dd81a34 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
@@ -44,6 +44,7 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.regex.Matcher;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,14 +58,7 @@ public class FileIOChannelFactory implements IOChannelFactory {
    /**
    * Create a {@link FileIOChannelFactory} with the given {@link PipelineOptions}.
    */
-  public static FileIOChannelFactory fromOptions(PipelineOptions options) {
-    return create();
-  }
-
-  /**
-   * Create a {@link FileIOChannelFactory}.
-   */
-  public static FileIOChannelFactory create() {
+  public static FileIOChannelFactory fromOptions(@Nullable PipelineOptions options) {
     return new FileIOChannelFactory();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactoryRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactoryRegistrar.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactoryRegistrar.java
index 93752a4..7776b13 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactoryRegistrar.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactoryRegistrar.java
@@ -22,7 +22,7 @@ import java.util.ServiceLoader;
 import org.apache.beam.sdk.options.PipelineOptions;
 
 /**
- * A registrar that creates {@link IOChannelFactory} from {@link PipelineOptions}.
+ * A registrar that creates {@link IOChannelFactory} instances from {@link PipelineOptions}.
  *
  * <p>{@link IOChannelFactory} creators have the ability to provide a registrar by
creating
  * a {@link ServiceLoader} entry and a concrete implementation of this interface.
@@ -32,12 +32,17 @@ import org.apache.beam.sdk.options.PipelineOptions;
  */
 public interface IOChannelFactoryRegistrar {
   /**
-   * Create a {@link IOChannelFactory} with the given {@link PipelineOptions}.
+   * Create a {@link IOChannelFactory} from the given {@link PipelineOptions}.
    */
   IOChannelFactory fromOptions(PipelineOptions options);
 
   /**
-   * Get the scheme.
+   * Get the URI scheme which defines the namespace of the IOChannelFactoryRegistrar.
+   *
+   * <p>The scheme is required to be unique among all
+   * {@link IOChannelFactoryRegistrar IOChannelFactoryRegistrars}.
+   *
+   * @see <a href="https://www.ietf.org/rfc/rfc2396.txt">RFC 2396</a>
    */
   String getScheme();
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java
index d221fa9..d60ee97 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java
@@ -17,19 +17,33 @@
  */
 package org.apache.beam.sdk.util;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Ordering;
+import com.google.common.collect.Sets;
+import com.google.common.collect.TreeMultimap;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.channels.WritableByteChannel;
 import java.text.DecimalFormat;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.ServiceLoader;
 import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import javax.annotation.Nonnull;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
 
 /**
  * Provides utilities for creating read and write channels.
@@ -42,6 +56,8 @@ public class IOChannelUtils {
   // Pattern that matches shard placeholders within a shard template.
   private static final Pattern SHARD_FORMAT_RE = Pattern.compile("(S+|N+)");
 
+  private static final ClassLoader CLASS_LOADER = ReflectHelpers.findClassLoader();
+
   /**
    * Associates a scheme with an {@link IOChannelFactory}.
    *
@@ -50,18 +66,123 @@ public class IOChannelUtils {
    *
    * <p>For example, when reading from "gs://bucket/path", the scheme "gs" is
    * used to lookup the appropriate factory.
+   *
+   * <p>{@link PipelineOptions} are required to provide dependencies and
+   * pipeline level configuration to the individual {@link IOChannelFactory IOChannelFactories}.
+   *
+   * @throws IllegalStateException if multiple {@link IOChannelFactory IOChannelFactories}
+   * for the same scheme are detected.
    */
-  public static void setIOFactory(String scheme, IOChannelFactory factory) {
+  @VisibleForTesting
+  public static void setIOFactoryInternal(
+      String scheme,
+      IOChannelFactory factory,
+      boolean override) {
+    if (!override && FACTORY_MAP.containsKey(scheme)) {
+      throw new IllegalStateException(String.format(
+          "Failed to register IOChannelFactory: %s. "
+              + "Scheme: [%s] is already registered with %s, and override is not allowed.",
+          FACTORY_MAP.get(scheme).getClass(),
+          scheme,
+          factory.getClass()));
+    }
     FACTORY_MAP.put(scheme, factory);
   }
 
   /**
-   * Registers standard factories globally. This requires {@link PipelineOptions}
-   * to provide, e.g., credentials for GCS.
+   * Deregisters the scheme and the associated {@link IOChannelFactory}.
+   */
+  @VisibleForTesting
+  static void deregisterScheme(String scheme) {
+    FACTORY_MAP.remove(scheme);
+  }
+
+  /**
+   * Registers standard factories globally.
+   *
+   * <p>{@link PipelineOptions} are required to provide dependencies and
+   * pipeline level configuration to the individual {@link IOChannelFactory IOChannelFactories}.
+   *
+   * @deprecated use {@link #registerIOFactories}.
    */
+  @Deprecated
   public static void registerStandardIOFactories(PipelineOptions options) {
-    setIOFactory("gs", GcsIOChannelFactory.fromOptions(options));
-    setIOFactory("file", FileIOChannelFactory.fromOptions(options));
+    registerIOFactoriesAllowOverride(options);
+  }
+
+  /**
+   * Registers all {@link IOChannelFactory IOChannelFactories} from {@link ServiceLoader}.
+   *
+   * <p>{@link PipelineOptions} are required to provide dependencies and
+   * pipeline level configuration to the individual {@link IOChannelFactory IOChannelFactories}.
+   *
+   * <p>Multiple {@link IOChannelFactory IOChannelFactories} for the same scheme are
not allowed.
+   *
+   * @throws IllegalStateException if multiple {@link IOChannelFactory IOChannelFactories}
+   * for the same scheme are detected.
+   */
+  public static void registerIOFactories(PipelineOptions options) {
+    registerIOFactoriesInternal(options, false /* override */);
+  }
+
+  /**
+   * Registers all {@link IOChannelFactory IOChannelFactories} from {@link ServiceLoader}.
+   *
+   * <p>This requires {@link PipelineOptions} to provide, e.g., credentials for GCS.
+   *
+   * <p>Override existing schemes is allowed.
+   *
+   * @deprecated This is currently to provide different configurations for tests and
+   * is still public for IOChannelFactory redesign purposes.
+   */
+  @Deprecated
+  @VisibleForTesting
+  public static void registerIOFactoriesAllowOverride(PipelineOptions options) {
+    registerIOFactoriesInternal(options, true /* override */);
+  }
+
+  private static void registerIOFactoriesInternal(
+      PipelineOptions options, boolean override) {
+    Set<IOChannelFactoryRegistrar> registrars =
+        Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE);
+    registrars.addAll(Lists.newArrayList(
+        ServiceLoader.load(IOChannelFactoryRegistrar.class, CLASS_LOADER)));
+
+    checkDuplicateScheme(registrars);
+
+    for (IOChannelFactoryRegistrar registrar : registrars) {
+      setIOFactoryInternal(
+          registrar.getScheme(),
+          registrar.fromOptions(options),
+          override);
+    }
+  }
+
+  @VisibleForTesting
+  static void checkDuplicateScheme(Set<IOChannelFactoryRegistrar> registrars) {
+    Multimap<String, IOChannelFactoryRegistrar> registrarsBySchemes =
+        TreeMultimap.create(Ordering.<String>natural(), Ordering.arbitrary());
+
+    for (IOChannelFactoryRegistrar registrar : registrars) {
+      registrarsBySchemes.put(registrar.getScheme(), registrar);
+    }
+    for (Entry<String, Collection<IOChannelFactoryRegistrar>> entry
+        : registrarsBySchemes.asMap().entrySet()) {
+      if (entry.getValue().size() > 1) {
+        String conflictingRegistrars = Joiner.on(", ").join(
+            FluentIterable.from(entry.getValue())
+                .transform(new Function<IOChannelFactoryRegistrar, String>() {
+                  @Override
+                  public String apply(@Nonnull IOChannelFactoryRegistrar input) {
+                    return input.getClass().getName();
+                  }})
+                .toSortedList(Ordering.<String>natural()));
+        throw new IllegalStateException(String.format(
+            "Scheme: [%s] has conflicting registrars: [%s]",
+            entry.getKey(),
+            conflictingRegistrars));
+      }
+    }
   }
 
   /**
@@ -174,7 +295,7 @@ public class IOChannelUtils {
     Matcher matcher = URI_SCHEME_PATTERN.matcher(spec);
 
     if (!matcher.matches()) {
-      return FileIOChannelFactory.create();
+      return FileIOChannelFactory.fromOptions(null);
     }
 
     String scheme = matcher.group("scheme");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java
index 2b08fee..637e8e3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java
@@ -34,9 +34,12 @@ import java.lang.reflect.Type;
 import java.lang.reflect.TypeVariable;
 import java.lang.reflect.WildcardType;
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.Queue;
+import java.util.ServiceLoader;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.util.IOChannelUtils;
 
 /**
  * Utilities for working with with {@link Class Classes} and {@link Method Methods}.
@@ -167,6 +170,15 @@ public class ReflectHelpers {
     }
   };
 
+  /** A {@link Comparator} that uses the object's classes canonical name to compare them.
*/
+  public static class ObjectsClassComparator implements Comparator<Object> {
+    public static final ObjectsClassComparator INSTANCE = new ObjectsClassComparator();
+    @Override
+    public int compare(Object o1, Object o2) {
+      return o1.getClass().getCanonicalName().compareTo(o2.getClass().getCanonicalName());
+    }
+  }
+
   /**
    * Returns all the methods visible from the provided interfaces.
    *
@@ -203,4 +215,21 @@ public class ReflectHelpers {
     }
     return builder.build();
   }
+
+  /**
+   * Finds the appropriate {@code ClassLoader} to be used by the
+   * {@link ServiceLoader#load} call, which by default would use the context
+   * {@code ClassLoader}, which can be null. The fallback is as follows: context
+   * ClassLoader, class ClassLoader and finaly the system ClassLoader.
+   */
+  public static ClassLoader findClassLoader() {
+    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+    if (classLoader == null) {
+      classLoader = IOChannelUtils.class.getClassLoader();
+    }
+    if (classLoader == null) {
+      classLoader = ClassLoader.getSystemClassLoader();
+    }
+    return classLoader;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 1a07177..41a630f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -79,7 +79,7 @@ public class AvroIOTest {
 
   @BeforeClass
   public static void setupClass() {
-    IOChannelUtils.registerStandardIOFactories(TestPipeline.testingPipelineOptions());
+    IOChannelUtils.registerIOFactoriesAllowOverride(TestPipeline.testingPipelineOptions());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
index 5208910..dde5d02 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
@@ -462,7 +462,7 @@ public class FileBasedSourceTest {
                 new File(parent, "file1").getPath(),
                 new File(parent, "file2").getPath(),
                 new File(parent, "file3").getPath()));
-    IOChannelUtils.setIOFactory("mocked", mockIOFactory);
+    IOChannelUtils.setIOFactoryInternal("mocked", mockIOFactory, true /* override */);
 
     List<String> data2 = createStringDataset(3, 50);
     createFileWithData("file2", data2);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index dc71693..d3a5d5e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -175,7 +175,7 @@ public class TextIOTest {
 
   @BeforeClass
   public static void setupClass() throws IOException {
-    IOChannelUtils.registerStandardIOFactories(TestPipeline.testingPipelineOptions());
+    IOChannelUtils.registerIOFactoriesAllowOverride(TestPipeline.testingPipelineOptions());
     tempFolder = Files.createTempDirectory("TextIOTest");
     // empty files
     emptyTxt = writeToFile(EMPTY, "empty.txt", CompressionType.UNCOMPRESSED);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
index 0a2324f..7ff4a92 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
@@ -1461,40 +1461,6 @@ public class PipelineOptionsFactoryTest {
         containsString("The pipeline runner that will be used to execute the pipeline."));
   }
 
-  @Test
-  public void testFindProperClassLoaderIfContextClassLoaderIsNull() throws InterruptedException
{
-    final ClassLoader[] classLoader = new ClassLoader[1];
-    Thread thread = new Thread(new Runnable() {
-
-      @Override
-      public void run() {
-        classLoader[0] = PipelineOptionsFactory.findClassLoader();
-      }
-    });
-    thread.setContextClassLoader(null);
-    thread.start();
-    thread.join();
-    assertEquals(PipelineOptionsFactory.class.getClassLoader(), classLoader[0]);
-  }
-
-  @Test
-  public void testFindProperClassLoaderIfContextClassLoaderIsAvailable()
-      throws InterruptedException {
-    final ClassLoader[] classLoader = new ClassLoader[1];
-    Thread thread = new Thread(new Runnable() {
-
-      @Override
-      public void run() {
-        classLoader[0] = PipelineOptionsFactory.findClassLoader();
-      }
-    });
-    ClassLoader cl = new ClassLoader() {};
-    thread.setContextClassLoader(cl);
-    thread.start();
-    thread.join();
-    assertEquals(cl, classLoader[0]);
-  }
-
   private static class RegisteredTestRunner extends PipelineRunner<PipelineResult>
{
     public static PipelineRunner<PipelineResult> fromOptions(PipelineOptions options)
{
       return new RegisteredTestRunner();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryRegistrarTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryRegistrarTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryRegistrarTest.java
index 4600d81..f8f53e7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryRegistrarTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryRegistrarTest.java
@@ -33,8 +33,8 @@ public class FileIOChannelFactoryRegistrarTest {
 
   @Test
   public void testServiceLoader() {
-    for (IOChannelFactoryRegistrar registrar :
-        Lists.newArrayList(ServiceLoader.load(IOChannelFactoryRegistrar.class).iterator()))
{
+    for (IOChannelFactoryRegistrar registrar
+        : Lists.newArrayList(ServiceLoader.load(IOChannelFactoryRegistrar.class).iterator()))
{
       if (registrar instanceof FileIOChannelFactoryRegistrar) {
         return;
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java
index e27a043..38be65a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java
@@ -46,7 +46,7 @@ import org.junit.runners.JUnit4;
 public class FileIOChannelFactoryTest {
   @Rule public ExpectedException thrown = ExpectedException.none();
   @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
-  private FileIOChannelFactory factory = FileIOChannelFactory.create();
+  private FileIOChannelFactory factory = FileIOChannelFactory.fromOptions(null);
 
   private void testCreate(Path path) throws Exception {
     String expected = "my test string";

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java
index 32bd4fc..a29dd45 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java
@@ -33,8 +33,8 @@ public class GcsIOChannelFactoryRegistrarTest {
 
   @Test
   public void testServiceLoader() {
-    for (IOChannelFactoryRegistrar registrar :
-        Lists.newArrayList(ServiceLoader.load(IOChannelFactoryRegistrar.class).iterator()))
{
+    for (IOChannelFactoryRegistrar registrar
+        : Lists.newArrayList(ServiceLoader.load(IOChannelFactoryRegistrar.class).iterator()))
{
       if (registrar instanceof GcsIOChannelFactoryRegistrar) {
         return;
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java
index d92d3cd..6dfa4c7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java
@@ -19,15 +19,19 @@ package org.apache.beam.sdk.util;
 
 import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
+import com.google.common.collect.Sets;
 import com.google.common.io.Files;
 import java.io.File;
 import java.nio.charset.StandardCharsets;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -40,6 +44,9 @@ public class IOChannelUtilsTest {
   @Rule
   public TemporaryFolder tmpFolder = new TemporaryFolder();
 
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
   @Test
   public void testShardFormatExpansion() {
     assertEquals("output-001-of-123.txt",
@@ -106,4 +113,36 @@ public class IOChannelUtilsTest {
     assertEquals(expected,
         IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "aa", "bb", "cc"));
   }
+
+  @Test
+  public void testRegisterIOFactoriesAllowOverride() throws Exception {
+    IOChannelUtils.registerIOFactoriesAllowOverride(PipelineOptionsFactory.create());
+    IOChannelUtils.registerIOFactoriesAllowOverride(PipelineOptionsFactory.create());
+    assertNotNull(IOChannelUtils.getFactory("gs"));
+    assertNotNull(IOChannelUtils.getFactory("file"));
+  }
+
+  @Test
+  public void testRegisterIOFactories() throws Exception {
+    IOChannelUtils.deregisterScheme("gs");
+    IOChannelUtils.deregisterScheme("file");
+
+    IOChannelUtils.registerIOFactories(PipelineOptionsFactory.create());
+    assertNotNull(IOChannelUtils.getFactory("gs"));
+    assertNotNull(IOChannelUtils.getFactory("file"));
+    thrown.expect(RuntimeException.class);
+    thrown.expectMessage("Failed to register IOChannelFactory");
+    thrown.expectMessage("override is not allowed");
+    IOChannelUtils.registerIOFactories(PipelineOptionsFactory.create());
+  }
+
+  @Test
+  public void testCheckDuplicateScheme() throws Exception {
+    thrown.expect(RuntimeException.class);
+    thrown.expectMessage("Scheme: [file] has conflicting registrars");
+    IOChannelUtils.checkDuplicateScheme(
+        Sets.<IOChannelFactoryRegistrar>newHashSet(
+            new FileIOChannelFactoryRegistrar(),
+            new FileIOChannelFactoryRegistrar()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/ReflectHelpersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/ReflectHelpersTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/ReflectHelpersTest.java
index 8a1708c..5fae25f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/ReflectHelpersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/ReflectHelpersTest.java
@@ -148,4 +148,37 @@ public class ReflectHelpersTest {
             Options.class.getMethod("getObject").getAnnotations()[0]));
   }
 
+  @Test
+  public void testFindProperClassLoaderIfContextClassLoaderIsNull() throws InterruptedException
{
+    final ClassLoader[] classLoader = new ClassLoader[1];
+    Thread thread = new Thread(new Runnable() {
+
+      @Override
+      public void run() {
+        classLoader[0] = ReflectHelpers.findClassLoader();
+      }
+    });
+    thread.setContextClassLoader(null);
+    thread.start();
+    thread.join();
+    assertEquals(ReflectHelpers.class.getClassLoader(), classLoader[0]);
+  }
+
+  @Test
+  public void testFindProperClassLoaderIfContextClassLoaderIsAvailable()
+      throws InterruptedException {
+    final ClassLoader[] classLoader = new ClassLoader[1];
+    Thread thread = new Thread(new Runnable() {
+
+      @Override
+      public void run() {
+        classLoader[0] = ReflectHelpers.findClassLoader();
+      }
+    });
+    ClassLoader cl = new ClassLoader() {};
+    thread.setContextClassLoader(cl);
+    thread.start();
+    thread.join();
+    assertEquals(cl, classLoader[0]);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 51a69a2..40965e4 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -1420,7 +1420,7 @@ public class BigQueryIOTest implements Serializable {
     PipelineOptions options = PipelineOptionsFactory.create();
     options.setTempLocation("mock://tempLocation");
 
-    IOChannelUtils.setIOFactory("mock", mockIOChannelFactory);
+    IOChannelUtils.setIOFactoryInternal("mock", mockIOChannelFactory, true /* override */);
     when(mockIOChannelFactory.resolve(anyString(), anyString()))
         .thenReturn("mock://tempLocation/output");
     when(mockDatasetService.getTable(anyString(), anyString(), anyString()))
@@ -1501,7 +1501,7 @@ public class BigQueryIOTest implements Serializable {
         eq(destinationTable.getDatasetId()),
         eq(destinationTable.getTableId())))
         .thenReturn(new Table().setSchema(new TableSchema()));
-    IOChannelUtils.setIOFactory("mock", mockIOChannelFactory);
+    IOChannelUtils.setIOFactoryInternal("mock", mockIOChannelFactory, true /* override */);
     when(mockIOChannelFactory.resolve(anyString(), anyString()))
         .thenReturn("mock://tempLocation/output");
     when(mockJobService.pollJob(Mockito.<JobReference>any(), Mockito.anyInt()))
@@ -1584,7 +1584,7 @@ public class BigQueryIOTest implements Serializable {
         eq(destinationTable.getDatasetId()),
         eq(destinationTable.getTableId())))
         .thenReturn(new Table().setSchema(new TableSchema()));
-    IOChannelUtils.setIOFactory("mock", mockIOChannelFactory);
+    IOChannelUtils.setIOFactoryInternal("mock", mockIOChannelFactory, true /* override */);
     when(mockIOChannelFactory.resolve(anyString(), anyString()))
         .thenReturn("mock://tempLocation/output");
     when(mockJobService.pollJob(Mockito.<JobReference>any(), Mockito.anyInt()))



Mime
View raw message