crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-436: Support AvroMode-based serialization for Crunch-on-Spark
Date Sun, 24 Aug 2014 19:38:46 GMT
Repository: crunch
Updated Branches:
  refs/heads/apache-crunch-0.8 9e2ecba71 -> 67de89ab0


CRUNCH-436: Support AvroMode-based serialization for Crunch-on-Spark


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/67de89ab
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/67de89ab
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/67de89ab

Branch: refs/heads/apache-crunch-0.8
Commit: 67de89ab06f14bc8063770608cc206c1148ec7a9
Parents: 9e2ecba
Author: Josh Wills <jwills@apache.org>
Authored: Sat Jul 5 14:09:08 2014 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Sun Aug 24 12:25:52 2014 -0700

----------------------------------------------------------------------
 .../org/apache/crunch/types/avro/AvroMode.java  | 16 ++++++++
 .../java/org/apache/crunch/SparkPageRankIT.java |  5 ++-
 .../impl/spark/collect/PGroupedTableImpl.java   | 14 ++++++-
 .../crunch/impl/spark/serde/AvroSerDe.java      | 39 ++++++++++++--------
 .../apache/crunch/impl/spark/serde/SerDe.java   |  2 +
 5 files changed, 56 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/67de89ab/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java
index 9653f25..0c38105 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java
@@ -18,6 +18,7 @@
 
 package org.apache.crunch.types.avro;
 
+import com.google.common.collect.Maps;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumReader;
@@ -34,6 +35,8 @@ import org.apache.crunch.io.FormatBundle;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.ReflectionUtils;
 
+import java.util.Map;
+
 /**
  * AvroMode is an immutable object used for configuring the reading and writing of Avro types.
  * The mode will not be used or honored unless it has been appropriately configured using
one of the supported
@@ -310,6 +313,19 @@ public class AvroMode implements ReaderWriterFactory {
   }
 
   /**
+   * Returns the entries that a {@code Configuration} instance needs to enable
+   * this AvroMode as a serializable map of key-value pairs.
+   */
+  public Map<String, String> getModeProperties() {
+    Map<String, String> props = Maps.newHashMap();
+    props.put(AVRO_MODE_PROPERTY, this.modeType.toString());
+    if (factory != null) {
+      props.put(propName, factory.getClass().getCanonicalName());
+    }
+    return props;
+  }
+
+  /**
    * Populates the {@code conf} with mode specific settings.
    * @param conf the configuration to populate.
    * @deprecated use {@link #configure(org.apache.hadoop.conf.Configuration)}

http://git-wip-us.apache.org/repos/asf/crunch/blob/67de89ab/crunch-spark/src/it/java/org/apache/crunch/SparkPageRankIT.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/java/org/apache/crunch/SparkPageRankIT.java b/crunch-spark/src/it/java/org/apache/crunch/SparkPageRankIT.java
index c76c62a..47b9300 100644
--- a/crunch-spark/src/it/java/org/apache/crunch/SparkPageRankIT.java
+++ b/crunch-spark/src/it/java/org/apache/crunch/SparkPageRankIT.java
@@ -26,6 +26,7 @@ import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.PTypes;
 import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.avro.Avros;
 import org.apache.crunch.types.writable.WritableTypeFamily;
 import org.junit.Before;
 import org.junit.Rule;
@@ -76,9 +77,9 @@ public class SparkPageRankIT {
   }
 
   @Test
-  public void testAvroJSON() throws Exception {
+  public void testAvroReflects() throws Exception {
     PTypeFamily tf = AvroTypeFamily.getInstance();
-    PType<PageRankData> prType = PTypes.jsonString(PageRankData.class, tf);
+    PType<PageRankData> prType = Avros.reflects(PageRankData.class);
     String urlInput = tmpDir.copyResourceFileName("urls.txt");
     run(pipeline, urlInput, prType, tf);
     pipeline.done();

http://git-wip-us.apache.org/repos/asf/crunch/blob/67de89ab/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/PGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/PGroupedTableImpl.java
b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/PGroupedTableImpl.java
index 95811cf..4de50b8 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/PGroupedTableImpl.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/PGroupedTableImpl.java
@@ -39,14 +39,18 @@ import org.apache.crunch.impl.spark.serde.AvroSerDe;
 import org.apache.crunch.impl.spark.serde.SerDe;
 import org.apache.crunch.impl.spark.serde.WritableSerDe;
 import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.avro.AvroMode;
 import org.apache.crunch.types.avro.AvroType;
 import org.apache.crunch.types.writable.WritableType;
 import org.apache.crunch.util.PartitionUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDDLike;
 import org.apache.spark.storage.StorageLevel;
 
 import java.util.List;
+import java.util.Map;
 
 public class PGroupedTableImpl<K, V> extends BaseGroupedTable<K, V> implements
SparkCollection {
 
@@ -70,6 +74,12 @@ public class PGroupedTableImpl<K, V> extends BaseGroupedTable<K,
V> implements S
     return rdd;
   }
 
+  private AvroSerDe getAvroSerde(PType ptype, Configuration conf) {
+    AvroType at = (AvroType) ptype;
+    Map<String, String> props = AvroMode.fromType(at).withFactoryFromConfiguration(conf).getModeProperties();
+    return new AvroSerDe(at, props);
+  }
+
   private JavaRDDLike<?, ?> getJavaRDDLikeInternal(SparkRuntime runtime, CombineFn<K,
V> combineFn) {
     JavaPairRDD<K, V> parentRDD = (JavaPairRDD<K, V>) ((SparkCollection)getOnlyParent()).getJavaRDDLike(runtime);
     if (combineFn != null) {
@@ -78,8 +88,8 @@ public class PGroupedTableImpl<K, V> extends BaseGroupedTable<K,
V> implements S
     SerDe keySerde, valueSerde;
     PTableType<K, V> parentType = ptype.getTableType();
     if (parentType instanceof AvroType) {
-      keySerde = new AvroSerDe((AvroType) parentType.getKeyType());
-      valueSerde = new AvroSerDe((AvroType) parentType.getValueType());
+      keySerde = getAvroSerde(parentType.getKeyType(), runtime.getConfiguration());
+      valueSerde = getAvroSerde(parentType.getValueType(), runtime.getConfiguration());
     } else {
       keySerde = new WritableSerDe(((WritableType) parentType.getKeyType()).getSerializationClass());
       valueSerde = new WritableSerDe(((WritableType) parentType.getValueType()).getSerializationClass());

http://git-wip-us.apache.org/repos/asf/crunch/blob/67de89ab/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/AvroSerDe.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/AvroSerDe.java
b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/AvroSerDe.java
index e6e08a0..f82ba8e 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/AvroSerDe.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/AvroSerDe.java
@@ -26,53 +26,60 @@ import org.apache.avro.io.Decoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.io.Encoder;
 import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.mapred.AvroWrapper;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.crunch.types.avro.AvroMode;
 import org.apache.crunch.types.avro.AvroType;
 import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.conf.Configuration;
 
 import javax.annotation.Nullable;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.util.Map;
 
 public class AvroSerDe<T> implements SerDe<T> {
 
   private AvroType<T> avroType;
+  private Map<String, String> modeProperties;
+  private transient AvroMode mode;
   private transient DatumWriter<T> writer;
   private transient DatumReader<T> reader;
 
-  public AvroSerDe(AvroType<T> avroType) {
+  public AvroSerDe(AvroType<T> avroType, Map<String, String> modeProperties)
{
     this.avroType = avroType;
+    this.modeProperties = modeProperties;
     if (avroType.hasReflect() && avroType.hasSpecific()) {
       Avros.checkCombiningSpecificAndReflectionSchemas();
     }
   }
 
+  private AvroMode getMode() {
+    if (mode == null) {
+      mode = AvroMode.fromType(avroType);
+      if (modeProperties != null && !modeProperties.isEmpty()) {
+        Configuration conf = new Configuration();
+        for (Map.Entry<String, String> e : modeProperties.entrySet()) {
+          conf.set(e.getKey(), e.getValue());
+        }
+        mode = mode.withFactoryFromConfiguration(conf);
+      }
+    }
+    return mode;
+  }
+
   private DatumWriter<T> getWriter() {
     if (writer == null) {
-      if (avroType.hasReflect()) {
-        writer = new ReflectDatumWriter<T>(avroType.getSchema());
-      } else if (avroType.hasSpecific()) {
-        writer = new SpecificDatumWriter<T>(avroType.getSchema());
-      } else {
-        writer = new GenericDatumWriter<T>(avroType.getSchema());
-      }
+      writer = getMode().getWriter(avroType.getSchema());
     }
     return writer;
   }
 
   private DatumReader<T> getReader() {
     if (reader == null) {
-      if (avroType.hasReflect()) {
-        reader = new ReflectDatumReader<T>(avroType.getSchema());
-      } else if (avroType.hasSpecific()) {
-        reader = new SpecificDatumReader<T>(avroType.getSchema());
-      } else {
-        reader = new GenericDatumReader<T>(avroType.getSchema());
-      }
+      reader = getMode().getReader(avroType.getSchema());
     }
     return reader;
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/67de89ab/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/SerDe.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/SerDe.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/SerDe.java
index d374a41..887f656 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/SerDe.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/SerDe.java
@@ -18,10 +18,12 @@
 package org.apache.crunch.impl.spark.serde;
 
 import com.google.common.base.Function;
+import org.apache.hadoop.conf.Configuration;
 
 import java.io.Serializable;
 
 public interface SerDe<T> extends Serializable {
+
   byte[] toBytes(T obj) throws Exception;
 
   T fromBytes(byte[] bytes);


Mime
View raw message