crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mkw...@apache.org
Subject git commit: CRUNCH-357: Changed AvroMode to no longer an enum, an immutable object, exposed means of injecting instance into AvroFileSource, and added tests.
Date Tue, 04 Mar 2014 16:53:24 GMT
Repository: crunch
Updated Branches:
  refs/heads/apache-crunch-0.8 288db3e8b -> 82ce49f8d


CRUNCH-357: Changed AvroMode to no longer an enum, an
 immutable object, exposed means of injecting instance into AvroFileSource,
 and added tests.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/82ce49f8
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/82ce49f8
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/82ce49f8

Branch: refs/heads/apache-crunch-0.8
Commit: 82ce49f8dcf98b61740f8fd3a04de36ef5c8b853
Parents: 288db3e
Author: Micah Whitacre <mkwhit@apache.org>
Authored: Tue Mar 4 10:33:06 2014 -0600
Committer: Micah Whitacre <mkwhit@apache.org>
Committed: Tue Mar 4 10:34:50 2014 -0600

----------------------------------------------------------------------
 .../apache/crunch/io/avro/AvroFileSource.java   |  25 +-
 .../crunch/types/avro/AvroDeepCopier.java       |   3 +-
 .../org/apache/crunch/types/avro/AvroMode.java  | 247 ++++++++++++++++---
 .../org/apache/crunch/types/avro/Avros.java     |   8 +-
 .../apache/crunch/types/avro/AvroModeTest.java  | 170 +++++++++++++
 5 files changed, 412 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/82ce49f8/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
index 1b6b27b..66db25d 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
@@ -36,22 +36,31 @@ import org.apache.hadoop.fs.Path;
 
 public class AvroFileSource<T> extends FileSourceImpl<T> implements ReadableSource<T>
{
 
-  private static <S> FormatBundle getBundle(AvroType<S> ptype) {
+  private static <S> FormatBundle getBundle(AvroType<S> ptype, AvroMode mode)
{
     FormatBundle bundle = FormatBundle.forInput(AvroInputFormat.class)
-        .set(AvroJob.INPUT_IS_REFLECT, String.valueOf(ptype.hasReflect()))
-        .set(AvroJob.INPUT_SCHEMA, ptype.getSchema().toString())
-        .set(Avros.REFLECT_DATA_FACTORY_CLASS, Avros.REFLECT_DATA_FACTORY.getClass().getName())
-        .set(RuntimeParameters.DISABLE_COMBINE_FILE, Boolean.FALSE.toString());
-    AvroMode.fromType(ptype).configure(bundle);
+          .set(AvroJob.INPUT_IS_REFLECT, String.valueOf(ptype.hasReflect()))
+          .set(AvroJob.INPUT_SCHEMA, ptype.getSchema().toString())
+          .set(Avros.REFLECT_DATA_FACTORY_CLASS, Avros.REFLECT_DATA_FACTORY.getClass().getName())
+          .set(RuntimeParameters.DISABLE_COMBINE_FILE, Boolean.FALSE.toString());
+    mode.configure(bundle);
     return bundle;
   }
 
+
+  private static <S> FormatBundle getBundle(AvroType<S> ptype) {
+    return getBundle(ptype, AvroMode.fromType(ptype));
+  }
+
   private DatumReader<T> reader;
   
   public AvroFileSource(Path path, AvroType<T> ptype) {
     super(path, ptype, getBundle(ptype));
   }
 
+  public AvroFileSource(Path path, AvroType<T> ptype, AvroMode mode) {
+    super(path, ptype, getBundle(ptype, mode));
+  }
+
   public AvroFileSource(Path path, AvroType<T> ptype, DatumReader<T> reader)
{
     super(path, ptype, getBundle(ptype));
     this.reader = reader;
@@ -60,6 +69,10 @@ public class AvroFileSource<T> extends FileSourceImpl<T> implements
ReadableSour
   public AvroFileSource(List<Path> paths, AvroType<T> ptype) {
     super(paths, ptype, getBundle(ptype));
   }
+
+  public AvroFileSource(List<Path> paths, AvroType<T> ptype, AvroMode mode) {
+    super(paths, ptype, getBundle(ptype, mode));
+  }
   
   public AvroFileSource(List<Path> paths, AvroType<T> ptype, DatumReader<T>
reader) {
     super(paths, ptype, getBundle(ptype));

http://git-wip-us.apache.org/repos/asf/crunch/blob/82ce49f8/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
index 21dae45..1eca1b8 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
@@ -158,8 +158,7 @@ abstract class AvroDeepCopier<T> implements DeepCopier<T>,
Serializable {
 
     @Override
     protected DatumWriter<T> createDatumWriter(Configuration conf) {
-      AvroMode.REFLECT.setFromConfiguration(conf);
-      return AvroMode.REFLECT.getWriter(getSchema());
+      return AvroMode.REFLECT.withFactoryFromConfiguration(conf).getWriter(getSchema());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/82ce49f8/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java
index e2646cd..90ff791 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java
@@ -34,26 +34,62 @@ import org.apache.crunch.io.FormatBundle;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.ReflectionUtils;
 
-public enum AvroMode implements ReaderWriterFactory {
-  REFLECT (new ReflectDataFactory(), Avros.REFLECT_DATA_FACTORY_CLASS),
-  SPECIFIC ("crunch.specificfactory"),
-  GENERIC ("crunch.genericfactory");
+/**
+ * AvroMode is an immutable object used for configuring the reading and writing of Avro types.
+ * The mode will not be used or honored unless it has been appropriately configured using
one of the supported
+ * methods.  Certain sources might also support specifying a specific mode to use.
+ */
+public class AvroMode implements ReaderWriterFactory {
+
+  /**
+   * Internal enum which represents the various Avro data types.
+   */
+  public static enum ModeType {
+    SPECIFIC, REFLECT, GENERIC;
+  }
+
+  /**
+   * Default mode to use for reading and writing {@link ReflectData Reflect} types.
+   */
+  public static final AvroMode REFLECT  = new AvroMode(ModeType.REFLECT, new ReflectDataFactory(),
Avros.REFLECT_DATA_FACTORY_CLASS);
+
+  /**
+   * Default mode to use for reading and writing {@link SpecificData Specific} types.
+   */
+  public static final AvroMode SPECIFIC =new AvroMode(ModeType.SPECIFIC, "crunch.specificfactory");
+  /**
+   * Default mode to use for reading and writing {@link GenericData Generic} types.
+   */
+  public static final AvroMode GENERIC = new AvroMode(ModeType.GENERIC, "crunch.genericfactory");
 
   public static final String AVRO_MODE_PROPERTY = "crunch.avro.mode";
   public static final String AVRO_SHUFFLE_MODE_PROPERTY = "crunch.avro.shuffle.mode";
 
+  /**
+   * Creates an AvroMode based on the {@link #AVRO_MODE_PROPERTY} property in the {@code
conf}.
+   * @param conf The configuration holding the properties for mode to be created.
+   * @return an AvroMode based on the {@link #AVRO_MODE_PROPERTY} property in the {@code
conf}.
+   */
   public static AvroMode fromConfiguration(Configuration conf) {
-    AvroMode mode = conf.getEnum(AVRO_MODE_PROPERTY, REFLECT);
-    mode.setFromConfiguration(conf);
-    return mode;
+    AvroMode mode = getMode(conf.getEnum(AVRO_MODE_PROPERTY, ModeType.REFLECT));
+    return mode.withFactoryFromConfiguration(conf);
   }
 
+  /**
+   * Creates an AvroMode based on the {@link #AVRO_SHUFFLE_MODE_PROPERTY} property in the
{@code conf}.
+   * @param conf The configuration holding the properties for mode to be created.
+   * @return an AvroMode based on the {@link #AVRO_SHUFFLE_MODE_PROPERTY} property in the
{@code conf}.
+   */
   public static AvroMode fromShuffleConfiguration(Configuration conf) {
-    AvroMode mode = conf.getEnum(AVRO_SHUFFLE_MODE_PROPERTY, REFLECT);
-    mode.setFromConfiguration(conf);
-    return mode;
+    AvroMode mode = getMode(conf.getEnum(AVRO_SHUFFLE_MODE_PROPERTY, ModeType.REFLECT));
+    return mode.withFactoryFromConfiguration(conf);
   }
 
+  /**
+   * Creates an {@link AvroMode} based upon the specified {@code type}.
+   * @param type the Avro type which indicates a specific mode.
+   * @return an {@link AvroMode} based upon the specified {@code type}.
+   */
   public static AvroMode fromType(AvroType<?> type) {
     if (type.hasReflect()) {
       if (type.hasSpecific()) {
@@ -67,33 +103,59 @@ public enum AvroMode implements ReaderWriterFactory {
     }
   }
 
+  private static AvroMode getMode(ModeType modeType){
+    switch(modeType){
+        case SPECIFIC:
+          return SPECIFIC;
+        case GENERIC:
+          return GENERIC;
+        case REFLECT:
+        default:
+          return REFLECT;
+    }
+  }
+
   private static ClassLoader specificLoader = null;
 
   public static void setSpecificClassLoader(ClassLoader loader) {
     specificLoader = loader;
   }
 
-  // the factory methods in this class may be overridden in ReaderWriterFactory
-  ReaderWriterFactory factory;
+  /**
+   * the factory methods in this class may be overridden in ReaderWriterFactory
+   */
+  private final ReaderWriterFactory factory;
 
+  /**
+   * The property name used setting property into {@link Configuration}.
+   */
   private final String propName;
 
-  private AvroMode(ReaderWriterFactory factory, String propName) {
+  /**
+   * The mode type representing the Avro data form.
+   */
+  private final ModeType modeType;
+
+  private AvroMode(ModeType modeType, ReaderWriterFactory factory, String propName) {
     this.factory = factory;
     this.propName = propName;
+    this.modeType = modeType;
   }
 
-  private AvroMode(String propName) {
-    this.factory = null;
-    this.propName = propName;
+  private AvroMode(ModeType modeType, String propName) {
+    this(modeType, null, propName);
   }
 
+  /**
+   * Returns a {@link GenericData} instance based on the mode type.
+   * @return a {@link GenericData} instance based on the mode type.
+   */
   public GenericData getData() {
     if (factory != null) {
       return factory.getData();
     }
 
-    switch(this) {
+    switch(this.modeType) {
       case REFLECT:
         return ReflectData.AllowNull.get();
       case SPECIFIC:
@@ -103,12 +165,18 @@ public enum AvroMode implements ReaderWriterFactory {
     }
   }
 
+  /**
+   * Creates a {@code DatumReader} based on the {@code schema}.
+   * @param schema the schema to be read
+   * @param <T> the record type created by the reader.
+   * @return a {@code DatumReader} based on the {@code schema}.
+   */
   public <T> DatumReader<T> getReader(Schema schema) {
     if (factory != null) {
       return factory.getReader(schema);
     }
 
-    switch (this) {
+    switch (this.modeType) {
       case REFLECT:
         return new ReflectDatumReader<T>(schema);
       case SPECIFIC:
@@ -123,12 +191,18 @@ public enum AvroMode implements ReaderWriterFactory {
     }
   }
 
+  /**
+   * Creates a {@code DatumWriter} based on the {@code schema}.
+   * @param schema the schema to be read
+   * @param <T> the record type created by the writer.
+   * @return a {@code DatumWriter} based on the {@code schema}.
+   */
   public <T> DatumWriter<T> getWriter(Schema schema) {
     if (factory != null) {
       return factory.getWriter(schema);
     }
 
-    switch (this) {
+    switch (this.modeType) {
       case REFLECT:
         return new ReflectDatumWriter<T>(schema);
       case SPECIFIC:
@@ -138,38 +212,155 @@ public enum AvroMode implements ReaderWriterFactory {
     }
   }
 
-  public void override(ReaderWriterFactory factory) {
+  /**
+   * Creates a new {@code AvroMode} instance which will utilize the {@code factory} instance
+   * for creating Avro readers and writers.
+   *
+   * @param factory factory implementation for the mode to use
+   * @return a new {@code AvroMode} instance which will utilize the {@code factory} instance
+   * for creating Avro readers and writers.
+   * @deprecated use {@link #withFactory(ReaderWriterFactory)} instead.
+   */
+  @Deprecated
+  public AvroMode override(ReaderWriterFactory factory) {
+    return withFactory(factory);
+  }
+
+  /**
+   * Creates a new {@code AvroMode} instance which will utilize the {@code factory} instance
+   * for creating Avro readers and writers.  If {@code null} the default factory for the
mode
+   * will be used.
+   *
+   * @param factory factory implementation for the mode to use
+   * @return a new {@code AvroMode} instance which will utilize the {@code factory} instance
+   * for creating Avro readers and writers.
+   */
+  public AvroMode withFactory(ReaderWriterFactory factory){
     if (factory != this) {
-      this.factory = factory;
+      return withReaderWriterFactory(factory);
+    } else {
+      return this;
     }
   }
 
+  /**
+   * Populates the {@code conf} with mode specific settings for use during the shuffle phase.
+   * @param conf the configuration to populate.
+   */
   public void configureShuffle(Configuration conf) {
-    conf.setEnum(AVRO_SHUFFLE_MODE_PROPERTY, this);
-    configureFactory(conf);
+    conf.setEnum(AVRO_SHUFFLE_MODE_PROPERTY, this.modeType);
+    configure(conf);
   }
 
+  /**
+   * Populates the {@code bundle} with mode specific settings for the specific {@link FormatBundle}.
+   * @param bundle the bundle to populate.
+   */
   public void configure(FormatBundle bundle) {
-    bundle.set(AVRO_MODE_PROPERTY, this.toString());
+    bundle.set(AVRO_MODE_PROPERTY, this.modeType.toString());
     if (factory != null) {
       bundle.set(propName, factory.getClass().getName());
     }
   }
 
-  public void configureFactory(Configuration conf) {
+  /**
+   * Populates the {@code conf} with mode specific settings.
+   * @param conf the configuration to populate.
+   */
+  public void configure(Configuration conf) {
+    conf.set(AVRO_MODE_PROPERTY, this.modeType.toString());
     if (factory != null) {
       conf.setClass(propName, factory.getClass(), ReaderWriterFactory.class);
     }
   }
 
+  /**
+   * Populates the {@code conf} with mode specific settings.
+   * @param conf the configuration to populate.
+   * @deprecated use {@link #configure(org.apache.hadoop.conf.Configuration)}
+   */
+  @Deprecated
+  public void configureFactory(Configuration conf) {
+    configure(conf);
+  }
+
+  /**
+   * Creates a new {@code AvroMode} instance which will utilize the {@code factory} instance
+   * for creating Avro readers and writers.  If {@code null} the default factory for the
mode
+   * will be used.
+   *
+   * @param readerWriterFactory factory implementation for the mode to use
+   * @return a new {@code AvroMode} instance which will utilize the {@code factory} instance
+   * for creating Avro readers and writers.
+   */
+  private AvroMode withReaderWriterFactory(ReaderWriterFactory readerWriterFactory) {
+    return new AvroMode(modeType, readerWriterFactory, propName);
+  }
+
+  /**
+   * Returns the factory that will be used for the mode.
+   *
+   * @return the factory that will be used for the mode.
+   */
+  public ReaderWriterFactory getFactory() {
+    return factory != null ? factory : this;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if(o == null){
+      return false;
+    }
+
+    if(this == o){
+      return true;
+    }
+
+    if(!(o instanceof AvroMode)){
+      return false;
+    }
+
+    AvroMode that = (AvroMode) o;
+
+    if(!this.modeType.equals(that.modeType)){
+      return false;
+    }
+    if(!this.propName.equals(that.propName)){
+      return false;
+    }
+
+    if(this.factory != null){
+      if(that.factory == null){
+        return false;
+      }else {
+        return this.factory.equals(that.factory);
+      }
+    }else{
+      return that.factory == null;
+    }
+  }
+
+  @Override
+  public int hashCode() {
+    int hash = propName.hashCode();
+    hash = 31*hash + modeType.hashCode();
+    if(factory != null){
+      hash = 31*hash+factory.hashCode();
+    }
+
+    return hash;
+  }
+
   @SuppressWarnings("unchecked")
-  void setFromConfiguration(Configuration conf) {
+  AvroMode withFactoryFromConfiguration(Configuration conf) {
     // although the shuffle and input/output use different properties for mode,
     // this is shared - only one ReaderWriterFactory can be used.
     Class<?> factoryClass = conf.getClass(propName, this.getClass());
     if (factoryClass != this.getClass()) {
-      this.factory = (ReaderWriterFactory)
-          ReflectionUtils.newInstance(factoryClass, conf);
+      return withReaderWriterFactory((ReaderWriterFactory)
+                                         ReflectionUtils.newInstance(factoryClass, conf));
+    } else {
+      return this;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/82ce49f8/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
index a051a5f..266cb12 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
@@ -101,7 +101,7 @@ public class Avros {
    * @deprecated as of 0.9.0; use AvroMode.REFLECT.override(ReaderWriterFactory)
    */
   public static ReflectDataFactory REFLECT_DATA_FACTORY =
-      (ReflectDataFactory) AvroMode.REFLECT.factory;
+      (ReflectDataFactory) AvroMode.REFLECT.getFactory();
 
   /**
    * The name of the configuration parameter that tracks which reflection
@@ -116,16 +116,14 @@ public class Avros {
    */
   @Deprecated
   public static void configureReflectDataFactory(Configuration conf) {
-    AvroMode.REFLECT.override(REFLECT_DATA_FACTORY);
-    AvroMode.REFLECT.configureFactory(conf);
+    AvroMode.REFLECT.withFactory(REFLECT_DATA_FACTORY).configure(conf);
   }
 
   /**
    * @deprecated as of 0.9.0; use AvroMode.fromConfiguration(conf)
    */
   public static ReflectDataFactory getReflectDataFactory(Configuration conf) {
-    AvroMode.REFLECT.setFromConfiguration(conf);
-    return (ReflectDataFactory) AvroMode.REFLECT.factory;
+    return (ReflectDataFactory)AvroMode.REFLECT.withFactoryFromConfiguration(conf).getFactory();
   }
 
   public static void checkCombiningSpecificAndReflectionSchemas() {

http://git-wip-us.apache.org/repos/asf/crunch/blob/82ce49f8/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroModeTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroModeTest.java b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroModeTest.java
new file mode 100644
index 0000000..d1fdbb1
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroModeTest.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.specific.SpecificData;
+import org.apache.crunch.io.FormatBundle;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.junit.Assert.assertThat;
+
+public class AvroModeTest {
+
+  @Test
+  public void customWithFactory(){
+    ReaderWriterFactory fakeFactory = new FakeReaderWriterFactory();
+    AvroMode mode = AvroMode.SPECIFIC.withFactory(fakeFactory);
+    assertThat(mode.getFactory(), is(fakeFactory));
+    //assert that the original is unchanged
+    assertThat(mode, is(not(AvroMode.SPECIFIC)));
+    assertThat(AvroMode.SPECIFIC.getFactory(), is((ReaderWriterFactory) AvroMode.SPECIFIC));
+  }
+
+  @Test
+  public void sameWithFactory(){
+    AvroMode mode = AvroMode.SPECIFIC.withFactory(AvroMode.SPECIFIC);
+    assertThat(mode.getFactory(), is( (ReaderWriterFactory) AvroMode.SPECIFIC));
+  }
+
+  @Test
+  public void getDataSpecific(){
+    assertThat(AvroMode.SPECIFIC.getData(), is(instanceOf(SpecificData.class)));
+  }
+
+  @Test
+  public void getDataGeneric(){
+    assertThat(AvroMode.GENERIC.getData(), is(instanceOf(GenericData.class)));
+  }
+
+  @Test
+  public void getDataReflect(){
+    assertThat(AvroMode.REFLECT.getData(), is(instanceOf(ReflectData.class)));
+  }
+
+  @Test
+  public void configureAndRetrieveSpecific(){
+    Configuration conf = new Configuration();
+    AvroMode.SPECIFIC.configure(conf);
+    AvroMode returnedMode = AvroMode.fromConfiguration(conf);
+    assertThat(returnedMode, is(AvroMode.SPECIFIC));
+  }
+
+  @Test
+  public void configureAndRetrieveGeneric(){
+    Configuration conf = new Configuration();
+    AvroMode.GENERIC.configure(conf);
+    AvroMode returnedMode = AvroMode.fromConfiguration(conf);
+    assertThat(returnedMode, is(AvroMode.GENERIC));
+  }
+
+  @Test
+  public void configureShuffleAndRetrieveSpecific(){
+    Configuration conf = new Configuration();
+    AvroMode.SPECIFIC.configureShuffle(conf);
+    AvroMode returnedMode = AvroMode.fromShuffleConfiguration(conf);
+    assertThat(returnedMode, is(AvroMode.SPECIFIC));
+  }
+
+  @Test
+  public void configureShuffleAndRetrieveGeneric(){
+    Configuration conf = new Configuration();
+    AvroMode.GENERIC.configureShuffle(conf);
+    AvroMode returnedMode = AvroMode.fromShuffleConfiguration(conf);
+    assertThat(returnedMode, is(AvroMode.GENERIC));
+  }
+
+  @Test
+  public void configureBundleSpecific(){
+    FormatBundle bundle = FormatBundle.forInput(AvroInputFormat.class);
+    Configuration config = new Configuration();
+    AvroMode.SPECIFIC.configure(bundle);
+    bundle.configure(config);
+    AvroMode returnedMode = AvroMode.fromConfiguration(config);
+    assertThat(returnedMode.getData(), is(instanceOf(SpecificData.class)));
+  }
+
+  @Test
+  public void configureBundleGeneric(){
+    FormatBundle bundle = FormatBundle.forInput(AvroInputFormat.class);
+    Configuration config = new Configuration();
+    AvroMode.GENERIC.configure(bundle);
+    bundle.configure(config);
+    AvroMode returnedMode = AvroMode.fromConfiguration(config);
+    assertThat(returnedMode.getData(), is(instanceOf(GenericData.class)));
+  }
+
+  @Test
+  public void configureBundleReflect(){
+    FormatBundle bundle = FormatBundle.forInput(AvroInputFormat.class);
+    Configuration config = new Configuration();
+    AvroMode.REFLECT.configure(bundle);
+    bundle.configure(config);
+    AvroMode returnedMode = AvroMode.fromConfiguration(config);
+    assertThat(returnedMode.getData(), is(instanceOf(ReflectData.class)));
+  }
+
+  @Test
+  public void configureBundleCustomWithFactory(){
+    ReaderWriterFactory fakeFactory = new FakeReaderWriterFactory();
+    AvroMode mode = AvroMode.SPECIFIC.withFactory(fakeFactory);
+    FormatBundle bundle = FormatBundle.forInput(AvroInputFormat.class);
+    Configuration config = new Configuration();
+    mode.configure(bundle);
+    bundle.configure(config);
+    AvroMode returnedMode = AvroMode.fromConfiguration(config);
+    assertThat(returnedMode.getFactory(), is(instanceOf(FakeReaderWriterFactory.class)));
+  }
+
+  @Test
+  public void configureCustomWithFactory(){
+    ReaderWriterFactory fakeFactory = new FakeReaderWriterFactory();
+    AvroMode mode = AvroMode.SPECIFIC.withFactory(fakeFactory);
+    Configuration config = new Configuration();
+    mode.configure(config);
+    AvroMode returnedMode = AvroMode.fromConfiguration(config);
+    assertThat(returnedMode.getFactory(), is(instanceOf(FakeReaderWriterFactory.class)));
+  }
+
+  private static class FakeReaderWriterFactory implements ReaderWriterFactory{
+
+    @Override
+    public GenericData getData() {
+      return null;
+    }
+
+    @Override
+    public <D> DatumReader<D> getReader(Schema schema) {
+      return null;
+    }
+
+    @Override
+    public <D> DatumWriter<D> getWriter(Schema schema) {
+      return null;
+    }
+  }
+
+}


Mime
View raw message