pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From roh...@apache.org
Subject svn commit: r1795450 - in /pig/branches/spark: src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java test/excluded-tests-spark
Date Wed, 17 May 2017 20:44:50 GMT
Author: rohini
Date: Wed May 17 20:44:50 2017
New Revision: 1795450

URL: http://svn.apache.org/viewvc?rev=1795450&view=rev
Log:
PIG-5134: Fix TestAvroStorage unit test in Spark mode (nkollar via rohini)

Modified:
    pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java
    pig/branches/spark/test/excluded-tests-spark

Modified: pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java?rev=1795450&r1=1795449&r2=1795450&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java Wed May 17
20:44:50 2017
@@ -26,8 +26,17 @@ import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.generic.GenericArray;
 import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericEnumSymbol;
 import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.util.Utf8;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -38,6 +47,8 @@ import org.apache.pig.data.TupleFactory;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.nio.ByteBuffer;
 import java.util.Iterator;
 import java.util.List;
@@ -50,12 +61,12 @@ import java.util.Map;
 public final class AvroTupleWrapper <T extends IndexedRecord>
     implements Tuple {
     private static final Log LOG = LogFactory.getLog(AvroTupleWrapper.class);
-    private TupleFactory mTupleFactory = TupleFactory.getInstance();
+    private transient TupleFactory mTupleFactory = TupleFactory.getInstance();
 
   /**
    * The Avro object wrapped in the pig Tuple.
    */
-  private T avroObject;
+  private transient T avroObject;
 
   /**
    * Creates a new AvroTupleWrapper object.
@@ -205,7 +216,14 @@ public final class AvroTupleWrapper <T e
       case NULL:
         break;
       case STRING:
-        total += ((String) r.get(f.pos())).length()
+        Object val = r.get(f.pos());
+        String value;
+        if (val instanceof Utf8) {
+          value = val.toString();
+        } else {
+          value = (String) val;
+        }
+        total += value.length()
            * (Character.SIZE << bitsPerByte);
         break;
       case BYTES:
@@ -291,4 +309,21 @@ public final class AvroTupleWrapper <T e
         );
   }
 
+  // Required for Java serialization used by Spark: PIG-5134
+  private void writeObject(ObjectOutputStream out) throws IOException {
+    out.writeObject(avroObject.getSchema().toString());
+    DatumWriter<T> writer = new GenericDatumWriter<>();
+    writer.setSchema(avroObject.getSchema());
+    Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
+    writer.write(avroObject, encoder);
+    encoder.flush();
+  }
+
+  // Required for Java serialization used by Spark: PIG-5134
+  private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException
{
+    Schema schema = new Schema.Parser().parse((String) in.readObject());
+    DatumReader<T> reader = new GenericDatumReader<>(schema);
+    Decoder decoder = DecoderFactory.get().binaryDecoder(in, null);
+    avroObject = reader.read(avroObject, decoder);
+  }
 }

Modified: pig/branches/spark/test/excluded-tests-spark
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/excluded-tests-spark?rev=1795450&r1=1795449&r2=1795450&view=diff
==============================================================================
--- pig/branches/spark/test/excluded-tests-spark (original)
+++ pig/branches/spark/test/excluded-tests-spark Wed May 17 20:44:50 2017
@@ -3,7 +3,5 @@
 **/TestNativeMapReduce.java
 **/TestCounters.java
 
-#TODO: PIG-5134 fix for Spark mode
-**/TestAvroStorage.java
 #TODO: PIG-5135 fix for Spark mode
 **/TestOrcStoragePushdown.java



Mime
View raw message