incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [4/9] git commit: Add a static initializer to indicate whether combined specific+reflect schemas are supported by the version of avro used
Date Wed, 22 Aug 2012 06:26:48 GMT
Add a static initializer to indicate whether combined specific+reflect schemas are supported
by the version of avro used

Signed-off-by: Josh Wills <jwills@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/89e58b3e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/89e58b3e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/89e58b3e

Branch: refs/heads/master
Commit: 89e58b3e254d1ebb7885cc57574981a61b16e091
Parents: b05f183
Author: jwills <jwills@apache.org>
Authored: Tue Aug 21 07:25:56 2012 -0700
Committer: Josh Wills <jwills@cloudera.com>
Committed: Tue Aug 21 18:54:29 2012 -0700

----------------------------------------------------------------------
 .../crunch/io/avro/AvroFileReaderFactory.java      |    2 +-
 .../org/apache/crunch/io/avro/AvroFileSource.java  |    2 +-
 .../crunch/types/avro/AvroGroupedTableType.java    |    8 +++-
 .../org/apache/crunch/types/avro/AvroType.java     |   26 ++++++-------
 .../java/org/apache/crunch/types/avro/Avros.java   |   29 ++++++++++++++-
 .../crunch/types/avro/AvroTableTypeTest.java       |    6 +-
 .../org/apache/crunch/types/avro/AvroTypeTest.java |   30 +++++++-------
 7 files changed, 66 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/89e58b3e/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
index 220b134..3345bd6 100644
--- a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
+++ b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
@@ -53,7 +53,7 @@ public class AvroFileReaderFactory<T> implements FileReaderFactory<T>
{
   }
 
   private DatumReader<T> createDatumReader(AvroType<T> avroType) {
-    if (avroType.isSpecific()) {
+    if (avroType.hasSpecific()) {
       return new SpecificDatumReader<T>(avroType.getSchema());
     } else if (avroType.isGeneric()) {
       return new GenericDatumReader<T>(avroType.getSchema());

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/89e58b3e/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
index 0ce4c06..2226556 100644
--- a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
+++ b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
@@ -35,7 +35,7 @@ public class AvroFileSource<T> extends FileSourceImpl<T> implements
ReadableSour
 
   public AvroFileSource(Path path, AvroType<T> ptype) {
     super(path, ptype, new InputBundle(AvroInputFormat.class)
-        .set(AvroJob.INPUT_IS_REFLECT, String.valueOf(ptype.isReflect()))
+        .set(AvroJob.INPUT_IS_REFLECT, String.valueOf(ptype.hasReflect()))
         .set(AvroJob.INPUT_SCHEMA, ptype.getSchema().toString())
         .set(Avros.REFLECT_DATA_FACTORY_CLASS, Avros.REFLECT_DATA_FACTORY.getClass().getName()));
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/89e58b3e/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
index f4e407a..e15581d 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
@@ -82,8 +82,12 @@ public class AvroGroupedTableType<K, V> extends PGroupedTableType<K,
V> {
     String schemaJson = att.getSchema().toString();
     Configuration conf = job.getConfiguration();
 
-    if (att.isReflect()) {
-      conf.setBoolean(AvroJob.MAP_OUTPUT_IS_REFLECT, true);
+    if (att.hasReflect()) {
+      if (att.hasSpecific()) {
+        Avros.checkCombiningSpecificAndReflectionSchemas();
+      } else {
+        conf.setBoolean(AvroJob.MAP_OUTPUT_IS_REFLECT, true);
+      }
     }
     conf.set(AvroJob.MAP_OUTPUT_SCHEMA, schemaJson);
     job.setSortComparatorClass(AvroKeyComparator.class);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/89e58b3e/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
index 7aaec25..4997157 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
@@ -91,28 +91,26 @@ public class AvroType<T> implements PType<T> {
   }
 
   /**
-   * Determine if the wrapped type is a specific data avro type.
+   * Determine if the wrapped type is a specific data avro type or wraps one.
    * 
-   * @return true if the wrapped type is a specific data type
+   * @return true if the wrapped type is a specific data type or wraps one
    */
-  public boolean isSpecific() {
+  public boolean hasSpecific() {
     if (Avros.isPrimitive(this)) {
       return false;
     }
-
-    boolean hasSpecific = false;
+    
     if (!this.subTypes.isEmpty()) {
       for (PType<?> subType : this.subTypes) {
         AvroType<?> atype = (AvroType<?>) subType;
-        if (atype.isReflect()) {
-          return false;
-        } else if (atype.isSpecific()) {
-          hasSpecific = true;
+        if (atype.hasSpecific()) {
+          return true;
         }
       }
+      return false;
     }
     
-    return hasSpecific || SpecificRecord.class.isAssignableFrom(typeClass);
+    return SpecificRecord.class.isAssignableFrom(typeClass);
   }
 
   /**
@@ -125,18 +123,18 @@ public class AvroType<T> implements PType<T> {
   }
 
   /**
-   * Determine if the wrapped type is a reflection-based avro type.
+   * Determine if the wrapped type is a reflection-based avro type or wraps one.
    * 
-   * @return true if the wrapped type is a reflection-based type
+   * @return true if the wrapped type is a reflection-based type or wraps one.
    */
-  public boolean isReflect() {
+  public boolean hasReflect() {
     if (Avros.isPrimitive(this)) {
       return false;
     }
 
     if (!this.subTypes.isEmpty()) {
       for (PType<?> subType : this.subTypes) {
-        if (((AvroType<?>) subType).isReflect()) {
+        if (((AvroType<?>) subType).hasReflect()) {
           return true;
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/89e58b3e/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
index 24391ed..b3a9b7a 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
@@ -71,6 +71,14 @@ import com.google.common.collect.Maps;
 public class Avros {
 
   /**
+   * Older versions of Avro (i.e., before 1.7.0) do not support schemas that are composed
of
+   * a mix of specific and reflection-based schemas. This bit controls whether or not we
+   * allow Crunch jobs to be created that involve mixing specific and reflection-based schemas
+   * and can be overridden by the client developer.
+   */
+  public static boolean CAN_COMBINE_SPECIFIC_AND_REFLECT_SCHEMAS = false;
+  
+  /**
    * The instance we use for generating reflected schemas. May be modified by
    * clients (e.g., Scrunch.)
    */
@@ -91,6 +99,18 @@ public class Avros {
         conf.getClass(REFLECT_DATA_FACTORY_CLASS, ReflectDataFactory.class), conf);
   }
 
+  public static void checkCombiningSpecificAndReflectionSchemas() {
+    if (!CAN_COMBINE_SPECIFIC_AND_REFLECT_SCHEMAS) {
+      throw new IllegalStateException("Crunch does not support running jobs that"
+          + " contain a mixture of reflection-based and avro-generated data types."
+          + " Please consider turning your reflection-based type into an avro-generated"
+          + " type and using that generated type instead."
+          + " If the version of Avro you are using is 1.7.0 or greater, you can enable"
+          + " combined schemas by setting the Avros.CAN_COMBINE_SPECIFIC_AND_REFLECT_SCHEMAS"
+          + " field to 'true'.");
+    }
+  }
+  
   public static MapFn<CharSequence, String> UTF8_TO_STRING = new MapFn<CharSequence,
String>() {
     @Override
     public String map(CharSequence input) {
@@ -477,13 +497,20 @@ public class Avros {
       this.avroTypes = Lists.newArrayList();
       this.jsonSchema = schema.toString();
       boolean reflectFound = false;
+      boolean specificFound = false;
       for (PType ptype : ptypes) {
         AvroType atype = (AvroType) ptype;
         fns.add(atype.getOutputMapFn());
         avroTypes.add(atype);
-        if (atype.isReflect()) {
+        if (atype.hasReflect()) {
           reflectFound = true;
         }
+        if (atype.hasSpecific()) {
+          specificFound = true;
+        }
+      }
+      if (specificFound && reflectFound) {
+        checkCombiningSpecificAndReflectionSchemas();
       }
       this.isReflect = reflectFound;
     }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/89e58b3e/crunch/src/test/java/org/apache/crunch/types/avro/AvroTableTypeTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/avro/AvroTableTypeTest.java b/crunch/src/test/java/org/apache/crunch/types/avro/AvroTableTypeTest.java
index 37ed801..5e03ff8 100644
--- a/crunch/src/test/java/org/apache/crunch/types/avro/AvroTableTypeTest.java
+++ b/crunch/src/test/java/org/apache/crunch/types/avro/AvroTableTypeTest.java
@@ -53,17 +53,17 @@ public class AvroTableTypeTest {
 
   @Test
   public void testIsReflect_ContainsReflectKey() {
-    assertTrue(Avros.tableOf(Avros.reflects(StringWrapper.class), Avros.ints()).isReflect());
+    assertTrue(Avros.tableOf(Avros.reflects(StringWrapper.class), Avros.ints()).hasReflect());
   }
 
   @Test
   public void testIsReflect_ContainsReflectValue() {
-    assertTrue(Avros.tableOf(Avros.ints(), Avros.reflects(StringWrapper.class)).isReflect());
+    assertTrue(Avros.tableOf(Avros.ints(), Avros.reflects(StringWrapper.class)).hasReflect());
   }
 
   @Test
   public void testReflect_NoReflectKeyOrValue() {
-    assertFalse(Avros.tableOf(Avros.ints(), Avros.ints()).isReflect());
+    assertFalse(Avros.tableOf(Avros.ints(), Avros.ints()).hasReflect());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/89e58b3e/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java b/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java
index 955467c..170bebf 100644
--- a/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java
+++ b/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java
@@ -42,7 +42,7 @@ public class AvroTypeTest {
 
   @Test
   public void testIsSpecific_SpecificData() {
-    assertTrue(Avros.records(Person.class).isSpecific());
+    assertTrue(Avros.records(Person.class).hasSpecific());
   }
 
   @Test
@@ -52,7 +52,7 @@ public class AvroTypeTest {
 
   @Test
   public void testIsSpecific_GenericData() {
-    assertFalse(Avros.generics(Person.SCHEMA$).isSpecific());
+    assertFalse(Avros.generics(Person.SCHEMA$).hasSpecific());
   }
 
   @Test
@@ -62,7 +62,7 @@ public class AvroTypeTest {
 
   @Test
   public void testIsSpecific_NonAvroClass() {
-    assertFalse(Avros.ints().isSpecific());
+    assertFalse(Avros.ints().hasSpecific());
   }
 
   @Test
@@ -72,7 +72,7 @@ public class AvroTypeTest {
 
   @Test
   public void testIsSpecific_SpecificAvroTable() {
-    assertTrue(Avros.tableOf(Avros.strings(), Avros.records(Person.class)).isSpecific());
+    assertTrue(Avros.tableOf(Avros.strings(), Avros.records(Person.class)).hasSpecific());
   }
 
   @Test
@@ -82,7 +82,7 @@ public class AvroTypeTest {
 
   @Test
   public void testIsSpecific_GenericAvroTable() {
-    assertFalse(Avros.tableOf(Avros.strings(), Avros.generics(Person.SCHEMA$)).isSpecific());
+    assertFalse(Avros.tableOf(Avros.strings(), Avros.generics(Person.SCHEMA$)).hasSpecific());
   }
 
   @Test
@@ -92,52 +92,52 @@ public class AvroTypeTest {
 
   @Test
   public void testIsReflect_GenericType() {
-    assertFalse(Avros.generics(Person.SCHEMA$).isReflect());
+    assertFalse(Avros.generics(Person.SCHEMA$).hasReflect());
   }
 
   @Test
   public void testIsReflect_SpecificType() {
-    assertFalse(Avros.records(Person.class).isReflect());
+    assertFalse(Avros.records(Person.class).hasReflect());
   }
 
   @Test
   public void testIsReflect_ReflectSimpleType() {
-    assertTrue(Avros.reflects(StringWrapper.class).isReflect());
+    assertTrue(Avros.reflects(StringWrapper.class).hasReflect());
   }
 
   @Test
   public void testIsReflect_NonReflectSubType() {
-    assertFalse(Avros.pairs(Avros.ints(), Avros.ints()).isReflect());
+    assertFalse(Avros.pairs(Avros.ints(), Avros.ints()).hasReflect());
   }
 
   @Test
   public void testIsReflect_ReflectSubType() {
-    assertTrue(Avros.pairs(Avros.ints(), Avros.reflects(StringWrapper.class)).isReflect());
+    assertTrue(Avros.pairs(Avros.ints(), Avros.reflects(StringWrapper.class)).hasReflect());
   }
 
   @Test
   public void testIsReflect_TableOfNonReflectTypes() {
-    assertFalse(Avros.tableOf(Avros.ints(), Avros.strings()).isReflect());
+    assertFalse(Avros.tableOf(Avros.ints(), Avros.strings()).hasReflect());
   }
 
   @Test
   public void testIsReflect_TableWithReflectKey() {
-    assertTrue(Avros.tableOf(Avros.reflects(StringWrapper.class), Avros.ints()).isReflect());
+    assertTrue(Avros.tableOf(Avros.reflects(StringWrapper.class), Avros.ints()).hasReflect());
   }
 
   @Test
   public void testIsReflect_TableWithReflectValue() {
-    assertTrue(Avros.tableOf(Avros.ints(), Avros.reflects(StringWrapper.class)).isReflect());
+    assertTrue(Avros.tableOf(Avros.ints(), Avros.reflects(StringWrapper.class)).hasReflect());
   }
 
   @Test
   public void testReflect_CollectionContainingReflectValue() {
-    assertTrue(Avros.collections(Avros.reflects(StringWrapper.class)).isReflect());
+    assertTrue(Avros.collections(Avros.reflects(StringWrapper.class)).hasReflect());
   }
 
   @Test
   public void testReflect_CollectionNotContainingReflectValue() {
-    assertFalse(Avros.collections(Avros.generics(Person.SCHEMA$)).isReflect());
+    assertFalse(Avros.collections(Avros.generics(Person.SCHEMA$)).hasReflect());
   }
 
   @Test


Mime
View raw message