Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 64832200BC5 for ; Tue, 22 Nov 2016 21:11:23 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 631FB160B1E; Tue, 22 Nov 2016 20:11:23 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BD80B160AF1 for ; Tue, 22 Nov 2016 21:11:21 +0100 (CET) Received: (qmail 36593 invoked by uid 500); 22 Nov 2016 20:11:21 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 36581 invoked by uid 99); 22 Nov 2016 20:11:20 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 Nov 2016 20:11:20 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 6E600C0739 for ; Tue, 22 Nov 2016 20:11:20 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.218 X-Spam-Level: X-Spam-Status: No, score=-6.218 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id XkfN1DpHCTIb for ; Tue, 22 Nov 2016 20:11:17 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 735985FB35 for ; Tue, 22 Nov 2016 20:11:12 +0000 (UTC) Received: (qmail 32044 invoked by uid 99); 22 Nov 2016 20:11:06 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 Nov 2016 20:11:06 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 79F5EE38F9; Tue, 22 Nov 2016 20:11:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kenn@apache.org To: commits@beam.incubator.apache.org Date: Tue, 22 Nov 2016 20:11:51 -0000 Message-Id: <4b03a4f94eca4f34a5a3e3fcf1ff8d12@git.apache.org> In-Reply-To: <7214e37c8bdc48a0b2b7ad12120c227b@git.apache.org> References: <7214e37c8bdc48a0b2b7ad12120c227b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [47/50] incubator-beam git commit: [BEAM-59] Use ServiceLoader to register IOChannelFactories in IOChannelUtils. archived-at: Tue, 22 Nov 2016 20:11:23 -0000 [BEAM-59] Use ServiceLoader to register IOChannelFactories in IOChannelUtils. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fa417f9c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fa417f9c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fa417f9c Branch: refs/heads/python-sdk Commit: fa417f9c2c671626eba3326e82d47741000ec64d Parents: cd1a5e7 Author: Pei He Authored: Mon Oct 31 18:02:49 2016 -0700 Committer: Luke Cwik Committed: Tue Nov 22 06:18:55 2016 -0800 ---------------------------------------------------------------------- .../beam/runners/dataflow/DataflowRunner.java | 2 +- .../options/DataflowPipelineOptionsTest.java | 6 +- .../runners/dataflow/util/PackageUtilTest.java | 2 +- .../sdk/options/PipelineOptionsFactory.java | 32 +---- .../apache/beam/sdk/runners/PipelineRunner.java | 2 +- .../apache/beam/sdk/testing/TestPipeline.java | 2 +- .../beam/sdk/util/FileIOChannelFactory.java | 10 +- .../sdk/util/IOChannelFactoryRegistrar.java | 11 +- .../apache/beam/sdk/util/IOChannelUtils.java | 133 ++++++++++++++++++- .../beam/sdk/util/common/ReflectHelpers.java | 29 ++++ .../java/org/apache/beam/sdk/io/AvroIOTest.java | 2 +- .../apache/beam/sdk/io/FileBasedSourceTest.java | 2 +- .../java/org/apache/beam/sdk/io/TextIOTest.java | 2 +- .../sdk/options/PipelineOptionsFactoryTest.java | 34 ----- .../util/FileIOChannelFactoryRegistrarTest.java | 4 +- .../beam/sdk/util/FileIOChannelFactoryTest.java | 2 +- .../util/GcsIOChannelFactoryRegistrarTest.java | 4 +- .../beam/sdk/util/IOChannelUtilsTest.java | 39 ++++++ .../sdk/util/common/ReflectHelpersTest.java | 33 +++++ .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 6 +- 20 files changed, 259 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 841b13f..36328e9 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -240,7 +240,7 @@ public class DataflowRunner extends PipelineRunner { */ public static DataflowRunner fromOptions(PipelineOptions options) { // (Re-)register standard IO factories. Clobbers any prior credentials. - IOChannelUtils.registerStandardIOFactories(options); + IOChannelUtils.registerIOFactoriesAllowOverride(options); DataflowPipelineOptions dataflowOptions = PipelineOptionsValidator.validate(DataflowPipelineOptions.class, options); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java index 202d04b..52082e0 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java @@ -126,7 +126,7 @@ public class DataflowPipelineOptionsTest { @Test public void testStagingLocation() { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - IOChannelUtils.registerStandardIOFactories(options); + IOChannelUtils.registerIOFactoriesAllowOverride(options); options.setTempLocation("file://temp_location"); options.setStagingLocation("gs://staging_location"); assertTrue(isNullOrEmpty(options.getGcpTempLocation())); @@ -136,7 +136,7 @@ public class DataflowPipelineOptionsTest { @Test public void testDefaultToTempLocation() { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - IOChannelUtils.registerStandardIOFactories(options); + IOChannelUtils.registerIOFactoriesAllowOverride(options); options.setPathValidatorClass(NoopPathValidator.class); options.setTempLocation("gs://temp_location"); assertEquals("gs://temp_location", options.getGcpTempLocation()); @@ -146,7 +146,7 @@ public class DataflowPipelineOptionsTest { @Test public void testDefaultToGcpTempLocation() { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - IOChannelUtils.registerStandardIOFactories(options); + IOChannelUtils.registerIOFactoriesAllowOverride(options); options.setPathValidatorClass(NoopPathValidator.class); options.setTempLocation("gs://temp_location"); options.setGcpTempLocation("gs://gcp_temp_location"); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java index 02aceef..05a87dd 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java @@ -135,7 +135,7 @@ public class PackageUtilTest { GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class); pipelineOptions.setGcsUtil(mockGcsUtil); - IOChannelUtils.registerStandardIOFactories(pipelineOptions); + IOChannelUtils.registerIOFactoriesAllowOverride(pipelineOptions); } private File makeFileWithContents(String name, String contents) throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java index 304e166..6009867 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java @@ -481,23 +481,6 @@ public class PipelineOptionsFactory { /** The width at which options should be output. */ private static final int TERMINAL_WIDTH = 80; - /** - * Finds the appropriate {@code ClassLoader} to be used by the - * {@link ServiceLoader#load} call, which by default would use the context - * {@code ClassLoader}, which can be null. The fallback is as follows: context - * ClassLoader, class ClassLoader and finaly the system ClassLoader. - */ - static ClassLoader findClassLoader() { - ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); - if (classLoader == null) { - classLoader = PipelineOptionsFactory.class.getClassLoader(); - } - if (classLoader == null) { - classLoader = ClassLoader.getSystemClassLoader(); - } - return classLoader; - } - static { try { IGNORED_METHODS = ImmutableSet.builder() @@ -514,10 +497,10 @@ public class PipelineOptionsFactory { throw new ExceptionInInitializerError(e); } - CLASS_LOADER = findClassLoader(); + CLASS_LOADER = ReflectHelpers.findClassLoader(); Set pipelineRunnerRegistrars = - Sets.newTreeSet(ObjectsClassComparator.INSTANCE); + Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE); pipelineRunnerRegistrars.addAll( Lists.newArrayList(ServiceLoader.load(PipelineRunnerRegistrar.class, CLASS_LOADER))); // Store the list of all available pipeline runners. @@ -579,7 +562,7 @@ public class PipelineOptionsFactory { private static void initializeRegistry() { register(PipelineOptions.class); Set pipelineOptionsRegistrars = - Sets.newTreeSet(ObjectsClassComparator.INSTANCE); + Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE); pipelineOptionsRegistrars.addAll( Lists.newArrayList(ServiceLoader.load(PipelineOptionsRegistrar.class, CLASS_LOADER))); for (PipelineOptionsRegistrar registrar : pipelineOptionsRegistrars) { @@ -1390,15 +1373,6 @@ public class PipelineOptionsFactory { } } - /** A {@link Comparator} that uses the object's classes canonical name to compare them. */ - private static class ObjectsClassComparator implements Comparator { - static final ObjectsClassComparator INSTANCE = new ObjectsClassComparator(); - @Override - public int compare(Object o1, Object o2) { - return o1.getClass().getCanonicalName().compareTo(o2.getClass().getCanonicalName()); - } - } - /** A {@link Comparator} that uses the generic method signature to sort them. */ private static class MethodComparator implements Comparator { static final MethodComparator INSTANCE = new MethodComparator(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java index ede1507..77f5128 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java @@ -48,7 +48,7 @@ public abstract class PipelineRunner { checkNotNull(options); // (Re-)register standard IO factories. Clobbers any prior credentials. - IOChannelUtils.registerStandardIOFactories(gcsOptions); + IOChannelUtils.registerIOFactoriesAllowOverride(gcsOptions); @SuppressWarnings("unchecked") PipelineRunner result = http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java index f1bf09d..493d4cc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java @@ -152,7 +152,7 @@ public class TestPipeline extends Pipeline { } options.setStableUniqueNames(CheckEnabled.ERROR); - IOChannelUtils.registerStandardIOFactories(options); + IOChannelUtils.registerIOFactoriesAllowOverride(options); return options; } catch (IOException e) { throw new RuntimeException("Unable to instantiate test options from system property " http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java index 13591a3..dd81a34 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java @@ -44,6 +44,7 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.regex.Matcher; +import javax.annotation.Nullable; import org.apache.beam.sdk.options.PipelineOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,14 +58,7 @@ public class FileIOChannelFactory implements IOChannelFactory { /** * Create a {@link FileIOChannelFactory} with the given {@link PipelineOptions}. */ - public static FileIOChannelFactory fromOptions(PipelineOptions options) { - return create(); - } - - /** - * Create a {@link FileIOChannelFactory}. - */ - public static FileIOChannelFactory create() { + public static FileIOChannelFactory fromOptions(@Nullable PipelineOptions options) { return new FileIOChannelFactory(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactoryRegistrar.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactoryRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactoryRegistrar.java index 93752a4..7776b13 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactoryRegistrar.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactoryRegistrar.java @@ -22,7 +22,7 @@ import java.util.ServiceLoader; import org.apache.beam.sdk.options.PipelineOptions; /** - * A registrar that creates {@link IOChannelFactory} from {@link PipelineOptions}. + * A registrar that creates {@link IOChannelFactory} instances from {@link PipelineOptions}. * *

{@link IOChannelFactory} creators have the ability to provide a registrar by creating * a {@link ServiceLoader} entry and a concrete implementation of this interface. @@ -32,12 +32,17 @@ import org.apache.beam.sdk.options.PipelineOptions; */ public interface IOChannelFactoryRegistrar { /** - * Create a {@link IOChannelFactory} with the given {@link PipelineOptions}. + * Create a {@link IOChannelFactory} from the given {@link PipelineOptions}. */ IOChannelFactory fromOptions(PipelineOptions options); /** - * Get the scheme. + * Get the URI scheme which defines the namespace of the IOChannelFactoryRegistrar. + * + *

The scheme is required to be unique among all + * {@link IOChannelFactoryRegistrar IOChannelFactoryRegistrars}. + * + * @see RFC 2396 */ String getScheme(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java index d221fa9..d60ee97 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java @@ -17,19 +17,33 @@ */ package org.apache.beam.sdk.util; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; +import com.google.common.collect.Ordering; +import com.google.common.collect.Sets; +import com.google.common.collect.TreeMultimap; import java.io.FileNotFoundException; import java.io.IOException; import java.nio.channels.WritableByteChannel; import java.text.DecimalFormat; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Map.Entry; +import java.util.ServiceLoader; import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; +import javax.annotation.Nonnull; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.common.ReflectHelpers; /** * Provides utilities for creating read and write channels. @@ -42,6 +56,8 @@ public class IOChannelUtils { // Pattern that matches shard placeholders within a shard template. private static final Pattern SHARD_FORMAT_RE = Pattern.compile("(S+|N+)"); + private static final ClassLoader CLASS_LOADER = ReflectHelpers.findClassLoader(); + /** * Associates a scheme with an {@link IOChannelFactory}. * @@ -50,18 +66,123 @@ public class IOChannelUtils { * *

For example, when reading from "gs://bucket/path", the scheme "gs" is * used to lookup the appropriate factory. + * + *

{@link PipelineOptions} are required to provide dependencies and + * pipeline level configuration to the individual {@link IOChannelFactory IOChannelFactories}. + * + * @throws IllegalStateException if multiple {@link IOChannelFactory IOChannelFactories} + * for the same scheme are detected. */ - public static void setIOFactory(String scheme, IOChannelFactory factory) { + @VisibleForTesting + public static void setIOFactoryInternal( + String scheme, + IOChannelFactory factory, + boolean override) { + if (!override && FACTORY_MAP.containsKey(scheme)) { + throw new IllegalStateException(String.format( + "Failed to register IOChannelFactory: %s. " + + "Scheme: [%s] is already registered with %s, and override is not allowed.", + FACTORY_MAP.get(scheme).getClass(), + scheme, + factory.getClass())); + } FACTORY_MAP.put(scheme, factory); } /** - * Registers standard factories globally. This requires {@link PipelineOptions} - * to provide, e.g., credentials for GCS. + * Deregisters the scheme and the associated {@link IOChannelFactory}. + */ + @VisibleForTesting + static void deregisterScheme(String scheme) { + FACTORY_MAP.remove(scheme); + } + + /** + * Registers standard factories globally. + * + *

{@link PipelineOptions} are required to provide dependencies and + * pipeline level configuration to the individual {@link IOChannelFactory IOChannelFactories}. + * + * @deprecated use {@link #registerIOFactories}. */ + @Deprecated public static void registerStandardIOFactories(PipelineOptions options) { - setIOFactory("gs", GcsIOChannelFactory.fromOptions(options)); - setIOFactory("file", FileIOChannelFactory.fromOptions(options)); + registerIOFactoriesAllowOverride(options); + } + + /** + * Registers all {@link IOChannelFactory IOChannelFactories} from {@link ServiceLoader}. + * + *

{@link PipelineOptions} are required to provide dependencies and + * pipeline level configuration to the individual {@link IOChannelFactory IOChannelFactories}. + * + *

Multiple {@link IOChannelFactory IOChannelFactories} for the same scheme are not allowed. + * + * @throws IllegalStateException if multiple {@link IOChannelFactory IOChannelFactories} + * for the same scheme are detected. + */ + public static void registerIOFactories(PipelineOptions options) { + registerIOFactoriesInternal(options, false /* override */); + } + + /** + * Registers all {@link IOChannelFactory IOChannelFactories} from {@link ServiceLoader}. + * + *

This requires {@link PipelineOptions} to provide, e.g., credentials for GCS. + * + *

Override existing schemes is allowed. + * + * @deprecated This is currently to provide different configurations for tests and + * is still public for IOChannelFactory redesign purposes. + */ + @Deprecated + @VisibleForTesting + public static void registerIOFactoriesAllowOverride(PipelineOptions options) { + registerIOFactoriesInternal(options, true /* override */); + } + + private static void registerIOFactoriesInternal( + PipelineOptions options, boolean override) { + Set registrars = + Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE); + registrars.addAll(Lists.newArrayList( + ServiceLoader.load(IOChannelFactoryRegistrar.class, CLASS_LOADER))); + + checkDuplicateScheme(registrars); + + for (IOChannelFactoryRegistrar registrar : registrars) { + setIOFactoryInternal( + registrar.getScheme(), + registrar.fromOptions(options), + override); + } + } + + @VisibleForTesting + static void checkDuplicateScheme(Set registrars) { + Multimap registrarsBySchemes = + TreeMultimap.create(Ordering.natural(), Ordering.arbitrary()); + + for (IOChannelFactoryRegistrar registrar : registrars) { + registrarsBySchemes.put(registrar.getScheme(), registrar); + } + for (Entry> entry + : registrarsBySchemes.asMap().entrySet()) { + if (entry.getValue().size() > 1) { + String conflictingRegistrars = Joiner.on(", ").join( + FluentIterable.from(entry.getValue()) + .transform(new Function() { + @Override + public String apply(@Nonnull IOChannelFactoryRegistrar input) { + return input.getClass().getName(); + }}) + .toSortedList(Ordering.natural())); + throw new IllegalStateException(String.format( + "Scheme: [%s] has conflicting registrars: [%s]", + entry.getKey(), + conflictingRegistrars)); + } + } } /** @@ -174,7 +295,7 @@ public class IOChannelUtils { Matcher matcher = URI_SCHEME_PATTERN.matcher(spec); if (!matcher.matches()) { - return FileIOChannelFactory.create(); + return FileIOChannelFactory.fromOptions(null); } String scheme = matcher.group("scheme"); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java index 2b08fee..637e8e3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java @@ -34,9 +34,12 @@ import java.lang.reflect.Type; import java.lang.reflect.TypeVariable; import java.lang.reflect.WildcardType; import java.util.Arrays; +import java.util.Comparator; import java.util.Queue; +import java.util.ServiceLoader; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import org.apache.beam.sdk.util.IOChannelUtils; /** * Utilities for working with with {@link Class Classes} and {@link Method Methods}. @@ -167,6 +170,15 @@ public class ReflectHelpers { } }; + /** A {@link Comparator} that uses the object's classes canonical name to compare them. */ + public static class ObjectsClassComparator implements Comparator { + public static final ObjectsClassComparator INSTANCE = new ObjectsClassComparator(); + @Override + public int compare(Object o1, Object o2) { + return o1.getClass().getCanonicalName().compareTo(o2.getClass().getCanonicalName()); + } + } + /** * Returns all the methods visible from the provided interfaces. * @@ -203,4 +215,21 @@ public class ReflectHelpers { } return builder.build(); } + + /** + * Finds the appropriate {@code ClassLoader} to be used by the + * {@link ServiceLoader#load} call, which by default would use the context + * {@code ClassLoader}, which can be null. The fallback is as follows: context + * ClassLoader, class ClassLoader and finaly the system ClassLoader. + */ + public static ClassLoader findClassLoader() { + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + if (classLoader == null) { + classLoader = IOChannelUtils.class.getClassLoader(); + } + if (classLoader == null) { + classLoader = ClassLoader.getSystemClassLoader(); + } + return classLoader; + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index 1a07177..41a630f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -79,7 +79,7 @@ public class AvroIOTest { @BeforeClass public static void setupClass() { - IOChannelUtils.registerStandardIOFactories(TestPipeline.testingPipelineOptions()); + IOChannelUtils.registerIOFactoriesAllowOverride(TestPipeline.testingPipelineOptions()); } @Test http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java index 5208910..dde5d02 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java @@ -462,7 +462,7 @@ public class FileBasedSourceTest { new File(parent, "file1").getPath(), new File(parent, "file2").getPath(), new File(parent, "file3").getPath())); - IOChannelUtils.setIOFactory("mocked", mockIOFactory); + IOChannelUtils.setIOFactoryInternal("mocked", mockIOFactory, true /* override */); List data2 = createStringDataset(3, 50); createFileWithData("file2", data2); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java index dc71693..d3a5d5e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java @@ -175,7 +175,7 @@ public class TextIOTest { @BeforeClass public static void setupClass() throws IOException { - IOChannelUtils.registerStandardIOFactories(TestPipeline.testingPipelineOptions()); + IOChannelUtils.registerIOFactoriesAllowOverride(TestPipeline.testingPipelineOptions()); tempFolder = Files.createTempDirectory("TextIOTest"); // empty files emptyTxt = writeToFile(EMPTY, "empty.txt", CompressionType.UNCOMPRESSED); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java index 0a2324f..7ff4a92 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java @@ -1461,40 +1461,6 @@ public class PipelineOptionsFactoryTest { containsString("The pipeline runner that will be used to execute the pipeline.")); } - @Test - public void testFindProperClassLoaderIfContextClassLoaderIsNull() throws InterruptedException { - final ClassLoader[] classLoader = new ClassLoader[1]; - Thread thread = new Thread(new Runnable() { - - @Override - public void run() { - classLoader[0] = PipelineOptionsFactory.findClassLoader(); - } - }); - thread.setContextClassLoader(null); - thread.start(); - thread.join(); - assertEquals(PipelineOptionsFactory.class.getClassLoader(), classLoader[0]); - } - - @Test - public void testFindProperClassLoaderIfContextClassLoaderIsAvailable() - throws InterruptedException { - final ClassLoader[] classLoader = new ClassLoader[1]; - Thread thread = new Thread(new Runnable() { - - @Override - public void run() { - classLoader[0] = PipelineOptionsFactory.findClassLoader(); - } - }); - ClassLoader cl = new ClassLoader() {}; - thread.setContextClassLoader(cl); - thread.start(); - thread.join(); - assertEquals(cl, classLoader[0]); - } - private static class RegisteredTestRunner extends PipelineRunner { public static PipelineRunner fromOptions(PipelineOptions options) { return new RegisteredTestRunner(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryRegistrarTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryRegistrarTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryRegistrarTest.java index 4600d81..f8f53e7 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryRegistrarTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryRegistrarTest.java @@ -33,8 +33,8 @@ public class FileIOChannelFactoryRegistrarTest { @Test public void testServiceLoader() { - for (IOChannelFactoryRegistrar registrar : - Lists.newArrayList(ServiceLoader.load(IOChannelFactoryRegistrar.class).iterator())) { + for (IOChannelFactoryRegistrar registrar + : Lists.newArrayList(ServiceLoader.load(IOChannelFactoryRegistrar.class).iterator())) { if (registrar instanceof FileIOChannelFactoryRegistrar) { return; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java index e27a043..38be65a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java @@ -46,7 +46,7 @@ import org.junit.runners.JUnit4; public class FileIOChannelFactoryTest { @Rule public ExpectedException thrown = ExpectedException.none(); @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); - private FileIOChannelFactory factory = FileIOChannelFactory.create(); + private FileIOChannelFactory factory = FileIOChannelFactory.fromOptions(null); private void testCreate(Path path) throws Exception { String expected = "my test string"; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java index 32bd4fc..a29dd45 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java @@ -33,8 +33,8 @@ public class GcsIOChannelFactoryRegistrarTest { @Test public void testServiceLoader() { - for (IOChannelFactoryRegistrar registrar : - Lists.newArrayList(ServiceLoader.load(IOChannelFactoryRegistrar.class).iterator())) { + for (IOChannelFactoryRegistrar registrar + : Lists.newArrayList(ServiceLoader.load(IOChannelFactoryRegistrar.class).iterator())) { if (registrar instanceof GcsIOChannelFactoryRegistrar) { return; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java index d92d3cd..6dfa4c7 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java @@ -19,15 +19,19 @@ package org.apache.beam.sdk.util; import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; +import com.google.common.collect.Sets; import com.google.common.io.Files; import java.io.File; import java.nio.charset.StandardCharsets; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -40,6 +44,9 @@ public class IOChannelUtilsTest { @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); + @Rule + public ExpectedException thrown = ExpectedException.none(); + @Test public void testShardFormatExpansion() { assertEquals("output-001-of-123.txt", @@ -106,4 +113,36 @@ public class IOChannelUtilsTest { assertEquals(expected, IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "aa", "bb", "cc")); } + + @Test + public void testRegisterIOFactoriesAllowOverride() throws Exception { + IOChannelUtils.registerIOFactoriesAllowOverride(PipelineOptionsFactory.create()); + IOChannelUtils.registerIOFactoriesAllowOverride(PipelineOptionsFactory.create()); + assertNotNull(IOChannelUtils.getFactory("gs")); + assertNotNull(IOChannelUtils.getFactory("file")); + } + + @Test + public void testRegisterIOFactories() throws Exception { + IOChannelUtils.deregisterScheme("gs"); + IOChannelUtils.deregisterScheme("file"); + + IOChannelUtils.registerIOFactories(PipelineOptionsFactory.create()); + assertNotNull(IOChannelUtils.getFactory("gs")); + assertNotNull(IOChannelUtils.getFactory("file")); + thrown.expect(RuntimeException.class); + thrown.expectMessage("Failed to register IOChannelFactory"); + thrown.expectMessage("override is not allowed"); + IOChannelUtils.registerIOFactories(PipelineOptionsFactory.create()); + } + + @Test + public void testCheckDuplicateScheme() throws Exception { + thrown.expect(RuntimeException.class); + thrown.expectMessage("Scheme: [file] has conflicting registrars"); + IOChannelUtils.checkDuplicateScheme( + Sets.newHashSet( + new FileIOChannelFactoryRegistrar(), + new FileIOChannelFactoryRegistrar())); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/ReflectHelpersTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/ReflectHelpersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/ReflectHelpersTest.java index 8a1708c..5fae25f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/ReflectHelpersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/ReflectHelpersTest.java @@ -148,4 +148,37 @@ public class ReflectHelpersTest { Options.class.getMethod("getObject").getAnnotations()[0])); } + @Test + public void testFindProperClassLoaderIfContextClassLoaderIsNull() throws InterruptedException { + final ClassLoader[] classLoader = new ClassLoader[1]; + Thread thread = new Thread(new Runnable() { + + @Override + public void run() { + classLoader[0] = ReflectHelpers.findClassLoader(); + } + }); + thread.setContextClassLoader(null); + thread.start(); + thread.join(); + assertEquals(ReflectHelpers.class.getClassLoader(), classLoader[0]); + } + + @Test + public void testFindProperClassLoaderIfContextClassLoaderIsAvailable() + throws InterruptedException { + final ClassLoader[] classLoader = new ClassLoader[1]; + Thread thread = new Thread(new Runnable() { + + @Override + public void run() { + classLoader[0] = ReflectHelpers.findClassLoader(); + } + }); + ClassLoader cl = new ClassLoader() {}; + thread.setContextClassLoader(cl); + thread.start(); + thread.join(); + assertEquals(cl, classLoader[0]); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 51a69a2..40965e4 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -1420,7 +1420,7 @@ public class BigQueryIOTest implements Serializable { PipelineOptions options = PipelineOptionsFactory.create(); options.setTempLocation("mock://tempLocation"); - IOChannelUtils.setIOFactory("mock", mockIOChannelFactory); + IOChannelUtils.setIOFactoryInternal("mock", mockIOChannelFactory, true /* override */); when(mockIOChannelFactory.resolve(anyString(), anyString())) .thenReturn("mock://tempLocation/output"); when(mockDatasetService.getTable(anyString(), anyString(), anyString())) @@ -1501,7 +1501,7 @@ public class BigQueryIOTest implements Serializable { eq(destinationTable.getDatasetId()), eq(destinationTable.getTableId()))) .thenReturn(new Table().setSchema(new TableSchema())); - IOChannelUtils.setIOFactory("mock", mockIOChannelFactory); + IOChannelUtils.setIOFactoryInternal("mock", mockIOChannelFactory, true /* override */); when(mockIOChannelFactory.resolve(anyString(), anyString())) .thenReturn("mock://tempLocation/output"); when(mockJobService.pollJob(Mockito.any(), Mockito.anyInt())) @@ -1584,7 +1584,7 @@ public class BigQueryIOTest implements Serializable { eq(destinationTable.getDatasetId()), eq(destinationTable.getTableId()))) .thenReturn(new Table().setSchema(new TableSchema())); - IOChannelUtils.setIOFactory("mock", mockIOChannelFactory); + IOChannelUtils.setIOFactoryInternal("mock", mockIOChannelFactory, true /* override */); when(mockIOChannelFactory.resolve(anyString(), anyString())) .thenReturn("mock://tempLocation/output"); when(mockJobService.pollJob(Mockito.any(), Mockito.anyInt()))