crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-314: Separate shuffle and bundle AvroMode configuration.
Date Sat, 21 Dec 2013 20:17:51 GMT
Updated Branches:
  refs/heads/master 316ccb6bf -> 58eb227d7


CRUNCH-314: Separate shuffle and bundle AvroMode configuration.

This adds an integration test, AvroModeIT, that catches the behavior
described in CRUNCH-314. The solution is to separate the
AvroMode#configure methods into configure, for sources and targets, and
configureShuffle, for SafeAvroSerialization and AvroGroupedTableType.

AvroDeepCopier and Avros also used a configure method to set the reflect
factory, which has been updated to the more specific configureFactory.

This also changes the default AvroMode to REFLECT because it is the most
general.

Signed-off-by: Josh Wills <jwills@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/58eb227d
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/58eb227d
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/58eb227d

Branch: refs/heads/master
Commit: 58eb227d78e6b1a32a334f83913df275ea1c7811
Parents: 316ccb6
Author: Ryan Blue <rblue@cloudera.com>
Authored: Fri Dec 20 18:12:48 2013 -0800
Committer: Josh Wills <jwills@apache.org>
Committed: Sat Dec 21 11:31:07 2013 -0800

----------------------------------------------------------------------
 .../org/apache/crunch/io/avro/AvroModeIT.java   | 144 +++++++++++++++++++
 crunch-core/src/it/resources/strings-100.avro   | Bin 0 -> 451 bytes
 .../crunch/types/avro/AvroDeepCopier.java       |   5 +-
 .../crunch/types/avro/AvroGroupedTableType.java |   2 +-
 .../org/apache/crunch/types/avro/AvroMode.java  |  25 +++-
 .../org/apache/crunch/types/avro/Avros.java     |   2 +-
 .../types/avro/SafeAvroSerialization.java       |   4 +-
 pom.xml                                         |   1 +
 8 files changed, 171 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/58eb227d/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroModeIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroModeIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroModeIT.java
new file mode 100644
index 0000000..ff66fd7
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroModeIT.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.crunch.io.avro;
+
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Random;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.crunch.Aggregator;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.Source;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.From;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.avro.AvroMode;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AvroModeIT implements Serializable {
+
+  public static final Schema GENERIC_SCHEMA = new Schema.Parser().parse("{\n" +
+      "  \"name\": \"mystring\",\n" +
+      "  \"type\": \"record\",\n" +
+      "  \"fields\": [\n" +
+      "    { \"name\": \"text\", \"type\": \"string\" }\n" +
+      "  ]\n" +
+      "}");
+
+  static final class FloatArray {
+    private final float[] values;
+    FloatArray() {
+      this(null);
+    }
+    FloatArray(float[] values) {
+      this.values = values;
+    }
+    float[] getValues() {
+      return values;
+    }
+  }
+
+  public static AvroType<float[]> FLOAT_ARRAY = Avros.derived(float[].class,
+      new MapFn<FloatArray, float[]>() {
+        @Override
+        public float[] map(FloatArray input) {
+          return input.getValues();
+        }
+      },
+      new MapFn<float[], FloatArray>() {
+        @Override
+        public FloatArray map(float[] input) {
+          return new FloatArray(input);
+        }
+      }, Avros.reflects(FloatArray.class));
+
+  @Rule
+  public transient TemporaryPath tmpDir = TemporaryPaths.create();
+
+  @Test
+  public void testGenericReflectConflict() throws IOException {
+    final Random rand = new Random();
+    rand.setSeed(12345);
+    Configuration conf = new Configuration();
+    Pipeline pipeline = new MRPipeline(AvroModeIT.class, conf);
+    Source<GenericData.Record> source = From.avroFile(
+        tmpDir.copyResourceFileName("strings-100.avro"),
+        Avros.generics(GENERIC_SCHEMA));
+    PTable<Long, float[]> mapPhase = pipeline
+        .read(source)
+        .parallelDo(new DoFn<GenericData.Record, Pair<Long, float[]>>() {
+          @Override
+          public void process(GenericData.Record input, Emitter<Pair<Long, float[]>>
emitter) {
+            emitter.emit(Pair.of(
+                Long.valueOf(input.get("text").toString().length()),
+                new float[] {rand.nextFloat(), rand.nextFloat()}));
+          }
+        }, Avros.tableOf(Avros.longs(), FLOAT_ARRAY));
+
+    PTable<Long, float[]> result = mapPhase
+        .groupByKey()
+        .combineValues(new Aggregator<float[]>() {
+          float[] accumulator = null;
+
+          @Override
+          public Iterable<float[]> results() {
+            return ImmutableList.of(accumulator);
+          }
+
+          @Override
+          public void initialize(Configuration conf) {
+          }
+
+          @Override
+          public void reset() {
+            this.accumulator = null;
+          }
+
+          @Override
+          public void update(float[] value) {
+            if (accumulator == null) {
+              accumulator = Arrays.copyOf(value, 2);
+            } else {
+              for (int i = 0; i < value.length; i += 1) {
+                accumulator[i] += value[i];
+              }
+            }
+          }
+        });
+
+    pipeline.writeTextFile(result, tmpDir.getFileName("unused"));
+    Assert.assertTrue("Should succeed", pipeline.done().succeeded());
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/58eb227d/crunch-core/src/it/resources/strings-100.avro
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/resources/strings-100.avro b/crunch-core/src/it/resources/strings-100.avro
new file mode 100755
index 0000000..c968b97
Binary files /dev/null and b/crunch-core/src/it/resources/strings-100.avro differ

http://git-wip-us.apache.org/repos/asf/crunch/blob/58eb227d/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
index 9e4b0a1..21dae45 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
@@ -152,13 +152,14 @@ abstract class AvroDeepCopier<T> implements DeepCopier<T>,
Serializable {
 
     @Override
     protected DatumReader<T> createDatumReader(Configuration conf) {
-      AvroMode.REFLECT.configure(conf);
+      AvroMode.REFLECT.configureFactory(conf);
       return AvroMode.REFLECT.getReader(getSchema());
     }
 
     @Override
     protected DatumWriter<T> createDatumWriter(Configuration conf) {
-      return AvroMode.fromConfiguration(conf).getWriter(getSchema());
+      AvroMode.REFLECT.setFromConfiguration(conf);
+      return AvroMode.REFLECT.getWriter(getSchema());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/58eb227d/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
index 62e6db4..a97f917 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
@@ -101,7 +101,7 @@ class AvroGroupedTableType<K, V> extends PGroupedTableType<K,
V> {
       options.configure(job);
     }
 
-    AvroMode.fromType(att).configure(conf);
+    AvroMode.fromType(att).configureShuffle(conf);
 
     Collection<String> serializations = job.getConfiguration().getStringCollection(
         "io.serializations");

http://git-wip-us.apache.org/repos/asf/crunch/blob/58eb227d/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java
index 77eece1..e2646cd 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java
@@ -40,9 +40,16 @@ public enum AvroMode implements ReaderWriterFactory {
   GENERIC ("crunch.genericfactory");
 
   public static final String AVRO_MODE_PROPERTY = "crunch.avro.mode";
+  public static final String AVRO_SHUFFLE_MODE_PROPERTY = "crunch.avro.shuffle.mode";
 
   public static AvroMode fromConfiguration(Configuration conf) {
-    AvroMode mode = conf.getEnum(AVRO_MODE_PROPERTY, GENERIC);
+    AvroMode mode = conf.getEnum(AVRO_MODE_PROPERTY, REFLECT);
+    mode.setFromConfiguration(conf);
+    return mode;
+  }
+
+  public static AvroMode fromShuffleConfiguration(Configuration conf) {
+    AvroMode mode = conf.getEnum(AVRO_SHUFFLE_MODE_PROPERTY, REFLECT);
     mode.setFromConfiguration(conf);
     return mode;
   }
@@ -137,11 +144,9 @@ public enum AvroMode implements ReaderWriterFactory {
     }
   }
 
-  public void configure(Configuration conf) {
-    conf.setEnum(AVRO_MODE_PROPERTY, this);
-    if (factory != null) {
-      conf.setClass(propName, factory.getClass(), ReaderWriterFactory.class);
-    }
+  public void configureShuffle(Configuration conf) {
+    conf.setEnum(AVRO_SHUFFLE_MODE_PROPERTY, this);
+    configureFactory(conf);
   }
 
   public void configure(FormatBundle bundle) {
@@ -151,8 +156,16 @@ public enum AvroMode implements ReaderWriterFactory {
     }
   }
 
+  public void configureFactory(Configuration conf) {
+    if (factory != null) {
+      conf.setClass(propName, factory.getClass(), ReaderWriterFactory.class);
+    }
+  }
+
   @SuppressWarnings("unchecked")
   void setFromConfiguration(Configuration conf) {
+    // although the shuffle and input/output use different properties for mode,
+    // this is shared - only one ReaderWriterFactory can be used.
     Class<?> factoryClass = conf.getClass(propName, this.getClass());
     if (factoryClass != this.getClass()) {
       this.factory = (ReaderWriterFactory)

http://git-wip-us.apache.org/repos/asf/crunch/blob/58eb227d/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
index 3d6b04f..2cf63e8 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
@@ -116,7 +116,7 @@ public class Avros {
   @Deprecated
   public static void configureReflectDataFactory(Configuration conf) {
     AvroMode.REFLECT.override(REFLECT_DATA_FACTORY);
-    AvroMode.REFLECT.configure(conf);
+    AvroMode.REFLECT.configureFactory(conf);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/crunch/blob/58eb227d/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
b/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
index 7e323f1..1535a61 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
@@ -60,7 +60,7 @@ class SafeAvroSerialization<T> extends Configured implements Serialization<AvroW
     if (conf.getBoolean(AvroJob.MAP_OUTPUT_IS_REFLECT, false)) {
       datumReader = AvroMode.REFLECT.getReader(schema);
     } else {
-      datumReader = AvroMode.fromConfiguration(conf).getReader(schema);
+      datumReader = AvroMode.fromShuffleConfiguration(conf).getReader(schema);
     }
     return new AvroWrapperDeserializer(datumReader, isKey);
   }
@@ -105,7 +105,7 @@ class SafeAvroSerialization<T> extends Configured implements Serialization<AvroW
     Schema schema = isFinalOutput ? AvroJob.getOutputSchema(conf) : (AvroKey.class.isAssignableFrom(c)
? Pair
         .getKeySchema(AvroJob.getMapOutputSchema(conf)) : Pair.getValueSchema(AvroJob.getMapOutputSchema(conf)));
 
-    ReaderWriterFactory factory = AvroMode.fromConfiguration(conf);
+    ReaderWriterFactory factory = AvroMode.fromShuffleConfiguration(conf);
     DatumWriter<T> writer = factory.getWriter(schema);
     return new AvroWrapperSerializer(writer);
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/58eb227d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ad14ce4..cba17a7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -603,6 +603,7 @@ under the License.
             <exclude>.idea/**</exclude>
             <exclude>**/resources/*.txt</exclude>
             <exclude>**/resources/**/*.txt</exclude>
+            <exclude>**/resources/*.avro</exclude>
             <exclude>**/goal.txt</exclude>
             <exclude>**/target/generated-test-sources/**</exclude>
             <exclude>**/scripts/scrunch</exclude>


Mime
View raw message