spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [SPARK-10914] UnsafeRow serialization breaks when two machines have different Oops size.
Date Fri, 09 Oct 2015 00:25:17 GMT
Repository: spark
Updated Branches:
  refs/heads/master 02149ff08 -> 84ea28717


[SPARK-10914] UnsafeRow serialization breaks when two machines have different Oops size.

UnsafeRow contains 3 pieces of information when pointing to some data in memory (an object,
a base offset, and length). When the row is serialized with Java/Kryo serialization, the object
layout in memory can change if two machines have different pointer width (Oops in JVM).

To reproduce, launch Spark using

MASTER=local-cluster[2,1,1024] bin/spark-shell --conf "spark.executor.extraJavaOptions=-XX:-UseCompressedOops"

And then run the following

scala> sql("select 1 xx").collect()

Author: Reynold Xin <rxin@databricks.com>

Closes #9030 from rxin/SPARK-10914.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/84ea2871
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/84ea2871
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/84ea2871

Branch: refs/heads/master
Commit: 84ea287178247c163226e835490c9c70b17d8d3b
Parents: 02149ff
Author: Reynold Xin <rxin@databricks.com>
Authored: Thu Oct 8 17:25:14 2015 -0700
Committer: Reynold Xin <rxin@databricks.com>
Committed: Thu Oct 8 17:25:14 2015 -0700

----------------------------------------------------------------------
 .../sql/catalyst/expressions/UnsafeRow.java     | 47 ++++++++++++++++++--
 .../org/apache/spark/sql/UnsafeRowSuite.scala   | 29 +++++++++++-
 2 files changed, 72 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/84ea2871/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index e8ac299..5af7ed5 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -17,8 +17,7 @@
 
 package org.apache.spark.sql.catalyst.expressions;
 
-import java.io.IOException;
-import java.io.OutputStream;
+import java.io.*;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.util.Arrays;
@@ -26,6 +25,11 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoSerializable;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
 import org.apache.spark.sql.types.*;
 import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.array.ByteArrayMethods;
@@ -35,6 +39,7 @@ import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 
 import static org.apache.spark.sql.types.DataTypes.*;
+import static org.apache.spark.unsafe.Platform.BYTE_ARRAY_OFFSET;
 
 /**
  * An Unsafe implementation of Row which is backed by raw memory instead of Java objects.
@@ -52,7 +57,7 @@ import static org.apache.spark.sql.types.DataTypes.*;
  *
  * Instances of `UnsafeRow` act as pointers to row data stored in this format.
  */
-public final class UnsafeRow extends MutableRow {
+public final class UnsafeRow extends MutableRow implements Externalizable, KryoSerializable
{
 
   //////////////////////////////////////////////////////////////////////////////
   // Static methods
@@ -596,4 +601,40 @@ public final class UnsafeRow extends MutableRow {
   public void writeToMemory(Object target, long targetOffset) {
     Platform.copyMemory(baseObject, baseOffset, target, targetOffset, sizeInBytes);
   }
+
+  @Override
+  public void writeExternal(ObjectOutput out) throws IOException {
+    byte[] bytes = getBytes();
+    out.writeInt(bytes.length);
+    out.writeInt(this.numFields);
+    out.write(bytes);
+  }
+
+  @Override
+  public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+    this.baseOffset = BYTE_ARRAY_OFFSET;
+    this.sizeInBytes = in.readInt();
+    this.numFields = in.readInt();
+    this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields);
+    this.baseObject = new byte[sizeInBytes];
+    in.readFully((byte[]) baseObject);
+  }
+
+  @Override
+  public void write(Kryo kryo, Output out) {
+    byte[] bytes = getBytes();
+    out.writeInt(bytes.length);
+    out.writeInt(this.numFields);
+    out.write(bytes);
+  }
+
+  @Override
+  public void read(Kryo kryo, Input in) {
+    this.baseOffset = BYTE_ARRAY_OFFSET;
+    this.sizeInBytes = in.readInt();
+    this.numFields = in.readInt();
+    this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields);
+    this.baseObject = new byte[sizeInBytes];
+    in.read((byte[]) baseObject);
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/84ea2871/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala
index 944d4e1..7d1ee39 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala
@@ -19,7 +19,8 @@ package org.apache.spark.sql
 
 import java.io.ByteArrayOutputStream
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.serializer.{KryoSerializer, JavaSerializer}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{UnsafeRow, UnsafeProjection}
 import org.apache.spark.sql.types._
@@ -29,6 +30,32 @@ import org.apache.spark.unsafe.types.UTF8String
 
 class UnsafeRowSuite extends SparkFunSuite {
 
+  test("UnsafeRow Java serialization") {
+    // serializing an UnsafeRow pointing to a large buffer should only serialize the relevant
data
+    val data = new Array[Byte](1024)
+    val row = new UnsafeRow
+    row.pointTo(data, 1, 16)
+    row.setLong(0, 19285)
+
+    val ser = new JavaSerializer(new SparkConf).newInstance()
+    val row1 = ser.deserialize[UnsafeRow](ser.serialize(row))
+    assert(row1.getLong(0) == 19285)
+    assert(row1.getBaseObject().asInstanceOf[Array[Byte]].length == 16)
+  }
+
+  test("UnsafeRow Kryo serialization") {
+    // serializing an UnsafeRow pointing to a large buffer should only serialize the relevant
data
+    val data = new Array[Byte](1024)
+    val row = new UnsafeRow
+    row.pointTo(data, 1, 16)
+    row.setLong(0, 19285)
+
+    val ser = new KryoSerializer(new SparkConf).newInstance()
+    val row1 = ser.deserialize[UnsafeRow](ser.serialize(row))
+    assert(row1.getLong(0) == 19285)
+    assert(row1.getBaseObject().asInstanceOf[Array[Byte]].length == 16)
+  }
+
   test("bitset width calculation") {
     assert(UnsafeRow.calculateBitSetWidthInBytes(0) === 0)
     assert(UnsafeRow.calculateBitSetWidthInBytes(1) === 8)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message