Return-Path: X-Original-To: apmail-crunch-commits-archive@www.apache.org Delivered-To: apmail-crunch-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4CB5910789 for ; Tue, 4 Mar 2014 16:34:10 +0000 (UTC) Received: (qmail 44173 invoked by uid 500); 4 Mar 2014 16:34:09 -0000 Delivered-To: apmail-crunch-commits-archive@crunch.apache.org Received: (qmail 43269 invoked by uid 500); 4 Mar 2014 16:34:00 -0000 Mailing-List: contact commits-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@crunch.apache.org Delivered-To: mailing list commits@crunch.apache.org Received: (qmail 41996 invoked by uid 99); 4 Mar 2014 16:33:55 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Mar 2014 16:33:55 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 7C25F935728; Tue, 4 Mar 2014 16:33:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mkwhit@apache.org To: commits@crunch.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: CRUNCH-357: Changed AvroMode to no longer an enum, an immutable object, exposed means of injecting instance into AvroFileSource, and added tests. Date: Tue, 4 Mar 2014 16:33:55 +0000 (UTC) Repository: crunch Updated Branches: refs/heads/master cfee30e98 -> 18028aab3 CRUNCH-357: Changed AvroMode to no longer an enum, an immutable object, exposed means of injecting instance into AvroFileSource, and added tests. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/18028aab Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/18028aab Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/18028aab Branch: refs/heads/master Commit: 18028aab3dfe18385bbf1d4d0f72ee97f68fbc97 Parents: cfee30e Author: Micah Whitacre Authored: Tue Mar 4 10:33:06 2014 -0600 Committer: Micah Whitacre Committed: Tue Mar 4 10:33:06 2014 -0600 ---------------------------------------------------------------------- .../apache/crunch/io/avro/AvroFileSource.java | 25 +- .../crunch/types/avro/AvroDeepCopier.java | 3 +- .../org/apache/crunch/types/avro/AvroMode.java | 247 ++++++++++++++++--- .../org/apache/crunch/types/avro/Avros.java | 8 +- .../apache/crunch/types/avro/AvroModeTest.java | 170 +++++++++++++ 5 files changed, 412 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/18028aab/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java index 1b6b27b..66db25d 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java @@ -36,22 +36,31 @@ import org.apache.hadoop.fs.Path; public class AvroFileSource extends FileSourceImpl implements ReadableSource { - private static FormatBundle getBundle(AvroType ptype) { + private static FormatBundle getBundle(AvroType ptype, AvroMode mode) { FormatBundle bundle = FormatBundle.forInput(AvroInputFormat.class) - .set(AvroJob.INPUT_IS_REFLECT, String.valueOf(ptype.hasReflect())) - .set(AvroJob.INPUT_SCHEMA, ptype.getSchema().toString()) - .set(Avros.REFLECT_DATA_FACTORY_CLASS, Avros.REFLECT_DATA_FACTORY.getClass().getName()) - .set(RuntimeParameters.DISABLE_COMBINE_FILE, Boolean.FALSE.toString()); - AvroMode.fromType(ptype).configure(bundle); + .set(AvroJob.INPUT_IS_REFLECT, String.valueOf(ptype.hasReflect())) + .set(AvroJob.INPUT_SCHEMA, ptype.getSchema().toString()) + .set(Avros.REFLECT_DATA_FACTORY_CLASS, Avros.REFLECT_DATA_FACTORY.getClass().getName()) + .set(RuntimeParameters.DISABLE_COMBINE_FILE, Boolean.FALSE.toString()); + mode.configure(bundle); return bundle; } + + private static FormatBundle getBundle(AvroType ptype) { + return getBundle(ptype, AvroMode.fromType(ptype)); + } + private DatumReader reader; public AvroFileSource(Path path, AvroType ptype) { super(path, ptype, getBundle(ptype)); } + public AvroFileSource(Path path, AvroType ptype, AvroMode mode) { + super(path, ptype, getBundle(ptype, mode)); + } + public AvroFileSource(Path path, AvroType ptype, DatumReader reader) { super(path, ptype, getBundle(ptype)); this.reader = reader; @@ -60,6 +69,10 @@ public class AvroFileSource extends FileSourceImpl implements ReadableSour public AvroFileSource(List paths, AvroType ptype) { super(paths, ptype, getBundle(ptype)); } + + public AvroFileSource(List paths, AvroType ptype, AvroMode mode) { + super(paths, ptype, getBundle(ptype, mode)); + } public AvroFileSource(List paths, AvroType ptype, DatumReader reader) { super(paths, ptype, getBundle(ptype)); http://git-wip-us.apache.org/repos/asf/crunch/blob/18028aab/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java index 21dae45..1eca1b8 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java @@ -158,8 +158,7 @@ abstract class AvroDeepCopier implements DeepCopier, Serializable { @Override protected DatumWriter createDatumWriter(Configuration conf) { - AvroMode.REFLECT.setFromConfiguration(conf); - return AvroMode.REFLECT.getWriter(getSchema()); + return AvroMode.REFLECT.withFactoryFromConfiguration(conf).getWriter(getSchema()); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/18028aab/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java index e2646cd..90ff791 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java @@ -34,26 +34,62 @@ import org.apache.crunch.io.FormatBundle; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.ReflectionUtils; -public enum AvroMode implements ReaderWriterFactory { - REFLECT (new ReflectDataFactory(), Avros.REFLECT_DATA_FACTORY_CLASS), - SPECIFIC ("crunch.specificfactory"), - GENERIC ("crunch.genericfactory"); +/** + * AvroMode is an immutable object used for configuring the reading and writing of Avro types. + * The mode will not be used or honored unless it has been appropriately configured using one of the supported + * methods. Certain sources might also support specifying a specific mode to use. + */ +public class AvroMode implements ReaderWriterFactory { + + /** + * Internal enum which represents the various Avro data types. + */ + public static enum ModeType { + SPECIFIC, REFLECT, GENERIC; + } + + /** + * Default mode to use for reading and writing {@link ReflectData Reflect} types. + */ + public static final AvroMode REFLECT = new AvroMode(ModeType.REFLECT, new ReflectDataFactory(), Avros.REFLECT_DATA_FACTORY_CLASS); + + /** + * Default mode to use for reading and writing {@link SpecificData Specific} types. + */ + public static final AvroMode SPECIFIC =new AvroMode(ModeType.SPECIFIC, "crunch.specificfactory"); + /** + * Default mode to use for reading and writing {@link GenericData Generic} types. + */ + public static final AvroMode GENERIC = new AvroMode(ModeType.GENERIC, "crunch.genericfactory"); public static final String AVRO_MODE_PROPERTY = "crunch.avro.mode"; public static final String AVRO_SHUFFLE_MODE_PROPERTY = "crunch.avro.shuffle.mode"; + /** + * Creates an AvroMode based on the {@link #AVRO_MODE_PROPERTY} property in the {@code conf}. + * @param conf The configuration holding the properties for mode to be created. + * @return an AvroMode based on the {@link #AVRO_MODE_PROPERTY} property in the {@code conf}. + */ public static AvroMode fromConfiguration(Configuration conf) { - AvroMode mode = conf.getEnum(AVRO_MODE_PROPERTY, REFLECT); - mode.setFromConfiguration(conf); - return mode; + AvroMode mode = getMode(conf.getEnum(AVRO_MODE_PROPERTY, ModeType.REFLECT)); + return mode.withFactoryFromConfiguration(conf); } + /** + * Creates an AvroMode based on the {@link #AVRO_SHUFFLE_MODE_PROPERTY} property in the {@code conf}. + * @param conf The configuration holding the properties for mode to be created. + * @return an AvroMode based on the {@link #AVRO_SHUFFLE_MODE_PROPERTY} property in the {@code conf}. + */ public static AvroMode fromShuffleConfiguration(Configuration conf) { - AvroMode mode = conf.getEnum(AVRO_SHUFFLE_MODE_PROPERTY, REFLECT); - mode.setFromConfiguration(conf); - return mode; + AvroMode mode = getMode(conf.getEnum(AVRO_SHUFFLE_MODE_PROPERTY, ModeType.REFLECT)); + return mode.withFactoryFromConfiguration(conf); } + /** + * Creates an {@link AvroMode} based upon the specified {@code type}. + * @param type the Avro type which indicates a specific mode. + * @return an {@link AvroMode} based upon the specified {@code type}. + */ public static AvroMode fromType(AvroType type) { if (type.hasReflect()) { if (type.hasSpecific()) { @@ -67,33 +103,59 @@ public enum AvroMode implements ReaderWriterFactory { } } + private static AvroMode getMode(ModeType modeType){ + switch(modeType){ + case SPECIFIC: + return SPECIFIC; + case GENERIC: + return GENERIC; + case REFLECT: + default: + return REFLECT; + } + } + private static ClassLoader specificLoader = null; public static void setSpecificClassLoader(ClassLoader loader) { specificLoader = loader; } - // the factory methods in this class may be overridden in ReaderWriterFactory - ReaderWriterFactory factory; + /** + * the factory methods in this class may be overridden in ReaderWriterFactory + */ + private final ReaderWriterFactory factory; + /** + * The property name used setting property into {@link Configuration}. + */ private final String propName; - private AvroMode(ReaderWriterFactory factory, String propName) { + /** + * The mode type representing the Avro data form. + */ + private final ModeType modeType; + + private AvroMode(ModeType modeType, ReaderWriterFactory factory, String propName) { this.factory = factory; this.propName = propName; + this.modeType = modeType; } - private AvroMode(String propName) { - this.factory = null; - this.propName = propName; + private AvroMode(ModeType modeType, String propName) { + this(modeType, null, propName); } + /** + * Returns a {@link GenericData} instance based on the mode type. + * @return a {@link GenericData} instance based on the mode type. + */ public GenericData getData() { if (factory != null) { return factory.getData(); } - switch(this) { + switch(this.modeType) { case REFLECT: return ReflectData.AllowNull.get(); case SPECIFIC: @@ -103,12 +165,18 @@ public enum AvroMode implements ReaderWriterFactory { } } + /** + * Creates a {@code DatumReader} based on the {@code schema}. + * @param schema the schema to be read + * @param the record type created by the reader. + * @return a {@code DatumReader} based on the {@code schema}. + */ public DatumReader getReader(Schema schema) { if (factory != null) { return factory.getReader(schema); } - switch (this) { + switch (this.modeType) { case REFLECT: return new ReflectDatumReader(schema); case SPECIFIC: @@ -123,12 +191,18 @@ public enum AvroMode implements ReaderWriterFactory { } } + /** + * Creates a {@code DatumWriter} based on the {@code schema}. + * @param schema the schema to be read + * @param the record type created by the writer. + * @return a {@code DatumWriter} based on the {@code schema}. + */ public DatumWriter getWriter(Schema schema) { if (factory != null) { return factory.getWriter(schema); } - switch (this) { + switch (this.modeType) { case REFLECT: return new ReflectDatumWriter(schema); case SPECIFIC: @@ -138,38 +212,155 @@ public enum AvroMode implements ReaderWriterFactory { } } - public void override(ReaderWriterFactory factory) { + /** + * Creates a new {@code AvroMode} instance which will utilize the {@code factory} instance + * for creating Avro readers and writers. + * + * @param factory factory implementation for the mode to use + * @return a new {@code AvroMode} instance which will utilize the {@code factory} instance + * for creating Avro readers and writers. + * @deprecated use {@link #withFactory(ReaderWriterFactory)} instead. + */ + @Deprecated + public AvroMode override(ReaderWriterFactory factory) { + return withFactory(factory); + } + + /** + * Creates a new {@code AvroMode} instance which will utilize the {@code factory} instance + * for creating Avro readers and writers. If {@code null} the default factory for the mode + * will be used. + * + * @param factory factory implementation for the mode to use + * @return a new {@code AvroMode} instance which will utilize the {@code factory} instance + * for creating Avro readers and writers. + */ + public AvroMode withFactory(ReaderWriterFactory factory){ if (factory != this) { - this.factory = factory; + return withReaderWriterFactory(factory); + } else { + return this; } } + /** + * Populates the {@code conf} with mode specific settings for use during the shuffle phase. + * @param conf the configuration to populate. + */ public void configureShuffle(Configuration conf) { - conf.setEnum(AVRO_SHUFFLE_MODE_PROPERTY, this); - configureFactory(conf); + conf.setEnum(AVRO_SHUFFLE_MODE_PROPERTY, this.modeType); + configure(conf); } + /** + * Populates the {@code bundle} with mode specific settings for the specific {@link FormatBundle}. + * @param bundle the bundle to populate. + */ public void configure(FormatBundle bundle) { - bundle.set(AVRO_MODE_PROPERTY, this.toString()); + bundle.set(AVRO_MODE_PROPERTY, this.modeType.toString()); if (factory != null) { bundle.set(propName, factory.getClass().getName()); } } - public void configureFactory(Configuration conf) { + /** + * Populates the {@code conf} with mode specific settings. + * @param conf the configuration to populate. + */ + public void configure(Configuration conf) { + conf.set(AVRO_MODE_PROPERTY, this.modeType.toString()); if (factory != null) { conf.setClass(propName, factory.getClass(), ReaderWriterFactory.class); } } + /** + * Populates the {@code conf} with mode specific settings. + * @param conf the configuration to populate. + * @deprecated use {@link #configure(org.apache.hadoop.conf.Configuration)} + */ + @Deprecated + public void configureFactory(Configuration conf) { + configure(conf); + } + + /** + * Creates a new {@code AvroMode} instance which will utilize the {@code factory} instance + * for creating Avro readers and writers. If {@code null} the default factory for the mode + * will be used. + * + * @param readerWriterFactory factory implementation for the mode to use + * @return a new {@code AvroMode} instance which will utilize the {@code factory} instance + * for creating Avro readers and writers. + */ + private AvroMode withReaderWriterFactory(ReaderWriterFactory readerWriterFactory) { + return new AvroMode(modeType, readerWriterFactory, propName); + } + + /** + * Returns the factory that will be used for the mode. + * + * @return the factory that will be used for the mode. + */ + public ReaderWriterFactory getFactory() { + return factory != null ? factory : this; + } + + @Override + public boolean equals(Object o) { + if(o == null){ + return false; + } + + if(this == o){ + return true; + } + + if(!(o instanceof AvroMode)){ + return false; + } + + AvroMode that = (AvroMode) o; + + if(!this.modeType.equals(that.modeType)){ + return false; + } + if(!this.propName.equals(that.propName)){ + return false; + } + + if(this.factory != null){ + if(that.factory == null){ + return false; + }else { + return this.factory.equals(that.factory); + } + }else{ + return that.factory == null; + } + } + + @Override + public int hashCode() { + int hash = propName.hashCode(); + hash = 31*hash + modeType.hashCode(); + if(factory != null){ + hash = 31*hash+factory.hashCode(); + } + + return hash; + } + @SuppressWarnings("unchecked") - void setFromConfiguration(Configuration conf) { + AvroMode withFactoryFromConfiguration(Configuration conf) { // although the shuffle and input/output use different properties for mode, // this is shared - only one ReaderWriterFactory can be used. Class factoryClass = conf.getClass(propName, this.getClass()); if (factoryClass != this.getClass()) { - this.factory = (ReaderWriterFactory) - ReflectionUtils.newInstance(factoryClass, conf); + return withReaderWriterFactory((ReaderWriterFactory) + ReflectionUtils.newInstance(factoryClass, conf)); + } else { + return this; } } } http://git-wip-us.apache.org/repos/asf/crunch/blob/18028aab/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java index a051a5f..266cb12 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java @@ -101,7 +101,7 @@ public class Avros { * @deprecated as of 0.9.0; use AvroMode.REFLECT.override(ReaderWriterFactory) */ public static ReflectDataFactory REFLECT_DATA_FACTORY = - (ReflectDataFactory) AvroMode.REFLECT.factory; + (ReflectDataFactory) AvroMode.REFLECT.getFactory(); /** * The name of the configuration parameter that tracks which reflection @@ -116,16 +116,14 @@ public class Avros { */ @Deprecated public static void configureReflectDataFactory(Configuration conf) { - AvroMode.REFLECT.override(REFLECT_DATA_FACTORY); - AvroMode.REFLECT.configureFactory(conf); + AvroMode.REFLECT.withFactory(REFLECT_DATA_FACTORY).configure(conf); } /** * @deprecated as of 0.9.0; use AvroMode.fromConfiguration(conf) */ public static ReflectDataFactory getReflectDataFactory(Configuration conf) { - AvroMode.REFLECT.setFromConfiguration(conf); - return (ReflectDataFactory) AvroMode.REFLECT.factory; + return (ReflectDataFactory)AvroMode.REFLECT.withFactoryFromConfiguration(conf).getFactory(); } public static void checkCombiningSpecificAndReflectionSchemas() { http://git-wip-us.apache.org/repos/asf/crunch/blob/18028aab/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroModeTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroModeTest.java b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroModeTest.java new file mode 100644 index 0000000..d1fdbb1 --- /dev/null +++ b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroModeTest.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.types.avro; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.specific.SpecificData; +import org.apache.crunch.io.FormatBundle; +import org.apache.hadoop.conf.Configuration; +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.junit.Assert.assertThat; + +public class AvroModeTest { + + @Test + public void customWithFactory(){ + ReaderWriterFactory fakeFactory = new FakeReaderWriterFactory(); + AvroMode mode = AvroMode.SPECIFIC.withFactory(fakeFactory); + assertThat(mode.getFactory(), is(fakeFactory)); + //assert that the original is unchanged + assertThat(mode, is(not(AvroMode.SPECIFIC))); + assertThat(AvroMode.SPECIFIC.getFactory(), is((ReaderWriterFactory) AvroMode.SPECIFIC)); + } + + @Test + public void sameWithFactory(){ + AvroMode mode = AvroMode.SPECIFIC.withFactory(AvroMode.SPECIFIC); + assertThat(mode.getFactory(), is( (ReaderWriterFactory) AvroMode.SPECIFIC)); + } + + @Test + public void getDataSpecific(){ + assertThat(AvroMode.SPECIFIC.getData(), is(instanceOf(SpecificData.class))); + } + + @Test + public void getDataGeneric(){ + assertThat(AvroMode.GENERIC.getData(), is(instanceOf(GenericData.class))); + } + + @Test + public void getDataReflect(){ + assertThat(AvroMode.REFLECT.getData(), is(instanceOf(ReflectData.class))); + } + + @Test + public void configureAndRetrieveSpecific(){ + Configuration conf = new Configuration(); + AvroMode.SPECIFIC.configure(conf); + AvroMode returnedMode = AvroMode.fromConfiguration(conf); + assertThat(returnedMode, is(AvroMode.SPECIFIC)); + } + + @Test + public void configureAndRetrieveGeneric(){ + Configuration conf = new Configuration(); + AvroMode.GENERIC.configure(conf); + AvroMode returnedMode = AvroMode.fromConfiguration(conf); + assertThat(returnedMode, is(AvroMode.GENERIC)); + } + + @Test + public void configureShuffleAndRetrieveSpecific(){ + Configuration conf = new Configuration(); + AvroMode.SPECIFIC.configureShuffle(conf); + AvroMode returnedMode = AvroMode.fromShuffleConfiguration(conf); + assertThat(returnedMode, is(AvroMode.SPECIFIC)); + } + + @Test + public void configureShuffleAndRetrieveGeneric(){ + Configuration conf = new Configuration(); + AvroMode.GENERIC.configureShuffle(conf); + AvroMode returnedMode = AvroMode.fromShuffleConfiguration(conf); + assertThat(returnedMode, is(AvroMode.GENERIC)); + } + + @Test + public void configureBundleSpecific(){ + FormatBundle bundle = FormatBundle.forInput(AvroInputFormat.class); + Configuration config = new Configuration(); + AvroMode.SPECIFIC.configure(bundle); + bundle.configure(config); + AvroMode returnedMode = AvroMode.fromConfiguration(config); + assertThat(returnedMode.getData(), is(instanceOf(SpecificData.class))); + } + + @Test + public void configureBundleGeneric(){ + FormatBundle bundle = FormatBundle.forInput(AvroInputFormat.class); + Configuration config = new Configuration(); + AvroMode.GENERIC.configure(bundle); + bundle.configure(config); + AvroMode returnedMode = AvroMode.fromConfiguration(config); + assertThat(returnedMode.getData(), is(instanceOf(GenericData.class))); + } + + @Test + public void configureBundleReflect(){ + FormatBundle bundle = FormatBundle.forInput(AvroInputFormat.class); + Configuration config = new Configuration(); + AvroMode.REFLECT.configure(bundle); + bundle.configure(config); + AvroMode returnedMode = AvroMode.fromConfiguration(config); + assertThat(returnedMode.getData(), is(instanceOf(ReflectData.class))); + } + + @Test + public void configureBundleCustomWithFactory(){ + ReaderWriterFactory fakeFactory = new FakeReaderWriterFactory(); + AvroMode mode = AvroMode.SPECIFIC.withFactory(fakeFactory); + FormatBundle bundle = FormatBundle.forInput(AvroInputFormat.class); + Configuration config = new Configuration(); + mode.configure(bundle); + bundle.configure(config); + AvroMode returnedMode = AvroMode.fromConfiguration(config); + assertThat(returnedMode.getFactory(), is(instanceOf(FakeReaderWriterFactory.class))); + } + + @Test + public void configureCustomWithFactory(){ + ReaderWriterFactory fakeFactory = new FakeReaderWriterFactory(); + AvroMode mode = AvroMode.SPECIFIC.withFactory(fakeFactory); + Configuration config = new Configuration(); + mode.configure(config); + AvroMode returnedMode = AvroMode.fromConfiguration(config); + assertThat(returnedMode.getFactory(), is(instanceOf(FakeReaderWriterFactory.class))); + } + + private static class FakeReaderWriterFactory implements ReaderWriterFactory{ + + @Override + public GenericData getData() { + return null; + } + + @Override + public DatumReader getReader(Schema schema) { + return null; + } + + @Override + public DatumWriter getWriter(Schema schema) { + return null; + } + } + +}