spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject [1/2] spark git commit: Revert [SPARK-10399] [SPARK-23879] [SPARK-23762] [SPARK-25317]
Date Sun, 09 Sep 2018 13:26:12 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.4 8f7d8a097 -> a00a160e1


http://git-wip-us.apache.org/repos/asf/spark/blob/a00a160e/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
index 717823e..75690ae 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@@ -26,6 +26,7 @@ import org.apache.spark.TaskContext;
 import org.apache.spark.memory.MemoryConsumer;
 import org.apache.spark.memory.SparkOutOfMemoryError;
 import org.apache.spark.memory.TaskMemoryManager;
+import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.UnsafeAlignedOffset;
 import org.apache.spark.unsafe.array.LongArray;
 import org.apache.spark.unsafe.memory.MemoryBlock;
@@ -215,7 +216,12 @@ public final class UnsafeInMemorySorter {
     if (newArray.size() < array.size()) {
       throw new SparkOutOfMemoryError("Not enough memory to grow pointer array");
     }
-    MemoryBlock.copyMemory(array.memoryBlock(), newArray.memoryBlock(), pos * 8L);
+    Platform.copyMemory(
+      array.getBaseObject(),
+      array.getBaseOffset(),
+      newArray.getBaseObject(),
+      newArray.getBaseOffset(),
+      pos * 8L);
     consumer.freeArray(array);
     array = newArray;
     usableCapacity = getUsableCapacity();
@@ -342,7 +348,10 @@ public final class UnsafeInMemorySorter {
           array, nullBoundaryPos, (pos - nullBoundaryPos) / 2L, 0, 7,
           radixSortSupport.sortDescending(), radixSortSupport.sortSigned());
       } else {
-        MemoryBlock unused = array.memoryBlock().subBlock(pos * 8L, (array.size() - pos)
* 8L);
+        MemoryBlock unused = new MemoryBlock(
+          array.getBaseObject(),
+          array.getBaseOffset() + pos * 8L,
+          (array.size() - pos) * 8L);
         LongArray buffer = new LongArray(unused);
         Sorter<RecordPointerAndKeyPrefix, LongArray> sorter =
           new Sorter<>(new UnsafeSortDataFormat(buffer));

http://git-wip-us.apache.org/repos/asf/spark/blob/a00a160e/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
index d7d2d0b..a0664b3 100644
--- a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
+++ b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
@@ -76,7 +76,7 @@ public class TaskMemoryManagerSuite {
     final MemoryConsumer c = new TestMemoryConsumer(manager, MemoryMode.ON_HEAP);
     final MemoryBlock dataPage = manager.allocatePage(256, c);
     c.freePage(dataPage);
-    Assert.assertEquals(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER, dataPage.getPageNumber());
+    Assert.assertEquals(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER, dataPage.pageNumber);
   }
 
   @Test(expected = AssertionError.class)

http://git-wip-us.apache.org/repos/asf/spark/blob/a00a160e/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
index 3e56db5..47173b8 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark._
 import org.apache.spark.memory.MemoryTestingUtils
 import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
 import org.apache.spark.unsafe.array.LongArray
-import org.apache.spark.unsafe.memory.OnHeapMemoryBlock
+import org.apache.spark.unsafe.memory.MemoryBlock
 import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, RecordPointerAndKeyPrefix,
UnsafeSortDataFormat}
 
 class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
@@ -105,8 +105,9 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext
{
     // the form [150000000, 150000001, 150000002, ...., 300000000, 0, 1, 2, ..., 149999999]
     // that can trigger copyRange() in TimSort.mergeLo() or TimSort.mergeHi()
     val ref = Array.tabulate[Long](size) { i => if (i < size / 2) size / 2 + i else
i }
-    val buf = new LongArray(OnHeapMemoryBlock.fromArray(ref))
-    val tmpBuf = new LongArray(new OnHeapMemoryBlock((size/2) * 8L))
+    val buf = new LongArray(MemoryBlock.fromLongArray(ref))
+    val tmp = new Array[Long](size/2)
+    val tmpBuf = new LongArray(MemoryBlock.fromLongArray(tmp))
 
     new Sorter(new UnsafeSortDataFormat(tmpBuf)).sort(
       buf, 0, size, new Comparator[RecordPointerAndKeyPrefix] {

http://git-wip-us.apache.org/repos/asf/spark/blob/a00a160e/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala
b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala
index ddf3740..d5956ea 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala
@@ -27,7 +27,7 @@ import com.google.common.primitives.Ints
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.internal.Logging
 import org.apache.spark.unsafe.array.LongArray
-import org.apache.spark.unsafe.memory.OnHeapMemoryBlock
+import org.apache.spark.unsafe.memory.MemoryBlock
 import org.apache.spark.util.collection.Sorter
 import org.apache.spark.util.random.XORShiftRandom
 
@@ -78,14 +78,14 @@ class RadixSortSuite extends SparkFunSuite with Logging {
   private def generateTestData(size: Long, rand: => Long): (Array[JLong], LongArray) =
{
     val ref = Array.tabulate[Long](Ints.checkedCast(size)) { i => rand }
     val extended = ref ++ Array.fill[Long](Ints.checkedCast(size))(0)
-    (ref.map(i => new JLong(i)), new LongArray(OnHeapMemoryBlock.fromArray(extended)))
+    (ref.map(i => new JLong(i)), new LongArray(MemoryBlock.fromLongArray(extended)))
   }
 
   private def generateKeyPrefixTestData(size: Long, rand: => Long): (LongArray, LongArray)
= {
     val ref = Array.tabulate[Long](Ints.checkedCast(size * 2)) { i => rand }
     val extended = ref ++ Array.fill[Long](Ints.checkedCast(size * 2))(0)
-    (new LongArray(OnHeapMemoryBlock.fromArray(ref)),
-     new LongArray(OnHeapMemoryBlock.fromArray(extended)))
+    (new LongArray(MemoryBlock.fromLongArray(ref)),
+     new LongArray(MemoryBlock.fromLongArray(extended)))
   }
 
   private def collectToArray(array: LongArray, offset: Int, length: Long): Array[Long] =
{
@@ -110,7 +110,7 @@ class RadixSortSuite extends SparkFunSuite with Logging {
   }
 
   private def referenceKeyPrefixSort(buf: LongArray, lo: Long, hi: Long, refCmp: PrefixComparator)
{
-    val sortBuffer = new LongArray(new OnHeapMemoryBlock(buf.size() * 8L))
+    val sortBuffer = new LongArray(MemoryBlock.fromLongArray(new Array[Long](buf.size().toInt)))
     new Sorter(new UnsafeSortDataFormat(sortBuffer)).sort(
       buf, Ints.checkedCast(lo), Ints.checkedCast(hi), new Comparator[RecordPointerAndKeyPrefix]
{
         override def compare(

http://git-wip-us.apache.org/repos/asf/spark/blob/a00a160e/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala
index dc38ee3..dc18e1d 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala
@@ -29,7 +29,7 @@ import org.apache.spark.mllib.feature.{HashingTF => OldHashingTF}
 import org.apache.spark.sql.{DataFrame, Dataset, Row}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.hash.Murmur3_x86_32.{hashInt, hashLong, hashUnsafeBytes2Block}
+import org.apache.spark.unsafe.hash.Murmur3_x86_32.{hashInt, hashLong, hashUnsafeBytes2}
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.Utils
 import org.apache.spark.util.collection.OpenHashMap
@@ -244,7 +244,8 @@ object FeatureHasher extends DefaultParamsReadable[FeatureHasher] {
       case f: Float => hashInt(java.lang.Float.floatToIntBits(f), seed)
       case d: Double => hashLong(java.lang.Double.doubleToLongBits(d), seed)
       case s: String =>
-        hashUnsafeBytes2Block(UTF8String.fromString(s).getMemoryBlock, seed)
+        val utf8 = UTF8String.fromString(s)
+        hashUnsafeBytes2(utf8.getBaseObject, utf8.getBaseOffset, utf8.numBytes(), seed)
       case _ => throw new SparkException("FeatureHasher with murmur3 algorithm does not
" +
         s"support type ${term.getClass.getCanonicalName} of input data.")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/a00a160e/mllib/src/main/scala/org/apache/spark/mllib/feature/HashingTF.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/HashingTF.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/HashingTF.scala
index 7b73b28..8935c84 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/feature/HashingTF.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/HashingTF.scala
@@ -160,7 +160,7 @@ object HashingTF {
       case d: Double => hashLong(java.lang.Double.doubleToLongBits(d), seed)
       case s: String =>
         val utf8 = UTF8String.fromString(s)
-        hashUnsafeBytesBlock(utf8.getMemoryBlock(), seed)
+        hashUnsafeBytes(utf8.getBaseObject, utf8.getBaseOffset, utf8.numBytes(), seed)
       case _ => throw new SparkException("HashingTF with murmur3 algorithm does not "
+
         s"support type ${term.getClass.getCanonicalName} of input data.")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/a00a160e/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
index 9e7b15d..9002abd 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
@@ -27,7 +27,6 @@ import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.array.ByteArrayMethods;
 import org.apache.spark.unsafe.bitset.BitSetMethods;
 import org.apache.spark.unsafe.hash.Murmur3_x86_32;
-import org.apache.spark.unsafe.memory.MemoryBlock;
 import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 
@@ -241,8 +240,7 @@ public final class UnsafeArrayData extends ArrayData {
     final long offsetAndSize = getLong(ordinal);
     final int offset = (int) (offsetAndSize >> 32);
     final int size = (int) offsetAndSize;
-    MemoryBlock mb = MemoryBlock.allocateFromObject(baseObject, baseOffset + offset, size);
-    return new UTF8String(mb);
+    return UTF8String.fromAddress(baseObject, baseOffset + offset, size);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/spark/blob/a00a160e/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index 469b0e6..a76e6ef 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -37,7 +37,6 @@ import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.array.ByteArrayMethods;
 import org.apache.spark.unsafe.bitset.BitSetMethods;
 import org.apache.spark.unsafe.hash.Murmur3_x86_32;
-import org.apache.spark.unsafe.memory.MemoryBlock;
 import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 
@@ -417,8 +416,7 @@ public final class UnsafeRow extends InternalRow implements Externalizable,
Kryo
     final long offsetAndSize = getLong(ordinal);
     final int offset = (int) (offsetAndSize >> 32);
     final int size = (int) offsetAndSize;
-    MemoryBlock mb = MemoryBlock.allocateFromObject(baseObject, baseOffset + offset, size);
-    return new UTF8String(mb);
+    return UTF8String.fromAddress(baseObject, baseOffset + offset, size);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/spark/blob/a00a160e/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/XXH64.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/XXH64.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/XXH64.java
index 8e9c0a2..eb5051b 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/XXH64.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/XXH64.java
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.sql.catalyst.expressions;
 
-import org.apache.spark.unsafe.memory.MemoryBlock;
+import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.types.UTF8String;
 
 // scalastyle: off
@@ -72,13 +72,13 @@ public final class XXH64 {
     return fmix(hash);
   }
 
-  public long hashUnsafeWordsBlock(MemoryBlock mb) {
-    return hashUnsafeWordsBlock(mb, seed);
+  public long hashUnsafeWords(Object base, long offset, int length) {
+    return hashUnsafeWords(base, offset, length, seed);
   }
 
-  public static long hashUnsafeWordsBlock(MemoryBlock mb, long seed) {
-    assert (mb.size() % 8 == 0) : "lengthInBytes must be a multiple of 8 (word-aligned)";
-    long hash = hashBytesByWordsBlock(mb, seed);
+  public static long hashUnsafeWords(Object base, long offset, int length, long seed) {
+    assert (length % 8 == 0) : "lengthInBytes must be a multiple of 8 (word-aligned)";
+    long hash = hashBytesByWords(base, offset, length, seed);
     return fmix(hash);
   }
 
@@ -86,22 +86,20 @@ public final class XXH64 {
     return hashUnsafeBytes(base, offset, length, seed);
   }
 
-  public static long hashUnsafeBytesBlock(MemoryBlock mb, long seed) {
-    long offset = 0;
-    long length = mb.size();
+  public static long hashUnsafeBytes(Object base, long offset, int length, long seed) {
     assert (length >= 0) : "lengthInBytes cannot be negative";
-    long hash = hashBytesByWordsBlock(mb, seed);
+    long hash = hashBytesByWords(base, offset, length, seed);
     long end = offset + length;
     offset += length & -8;
 
     if (offset + 4L <= end) {
-      hash ^= (mb.getInt(offset) & 0xFFFFFFFFL) * PRIME64_1;
+      hash ^= (Platform.getInt(base, offset) & 0xFFFFFFFFL) * PRIME64_1;
       hash = Long.rotateLeft(hash, 23) * PRIME64_2 + PRIME64_3;
       offset += 4L;
     }
 
     while (offset < end) {
-      hash ^= (mb.getByte(offset) & 0xFFL) * PRIME64_5;
+      hash ^= (Platform.getByte(base, offset) & 0xFFL) * PRIME64_5;
       hash = Long.rotateLeft(hash, 11) * PRIME64_1;
       offset++;
     }
@@ -109,11 +107,7 @@ public final class XXH64 {
   }
 
   public static long hashUTF8String(UTF8String str, long seed) {
-    return hashUnsafeBytesBlock(str.getMemoryBlock(), seed);
-  }
-
-  public static long hashUnsafeBytes(Object base, long offset, int length, long seed) {
-    return hashUnsafeBytesBlock(MemoryBlock.allocateFromObject(base, offset, length), seed);
+    return hashUnsafeBytes(str.getBaseObject(), str.getBaseOffset(), str.numBytes(), seed);
   }
 
   private static long fmix(long hash) {
@@ -125,31 +119,30 @@ public final class XXH64 {
     return hash;
   }
 
-  private static long hashBytesByWordsBlock(MemoryBlock mb, long seed) {
-    long offset = 0;
-    long length = mb.size();
+  private static long hashBytesByWords(Object base, long offset, int length, long seed) {
+    long end = offset + length;
     long hash;
     if (length >= 32) {
-      long limit = length - 32;
+      long limit = end - 32;
       long v1 = seed + PRIME64_1 + PRIME64_2;
       long v2 = seed + PRIME64_2;
       long v3 = seed;
       long v4 = seed - PRIME64_1;
 
       do {
-        v1 += mb.getLong(offset) * PRIME64_2;
+        v1 += Platform.getLong(base, offset) * PRIME64_2;
         v1 = Long.rotateLeft(v1, 31);
         v1 *= PRIME64_1;
 
-        v2 += mb.getLong(offset + 8) * PRIME64_2;
+        v2 += Platform.getLong(base, offset + 8) * PRIME64_2;
         v2 = Long.rotateLeft(v2, 31);
         v2 *= PRIME64_1;
 
-        v3 += mb.getLong(offset + 16) * PRIME64_2;
+        v3 += Platform.getLong(base, offset + 16) * PRIME64_2;
         v3 = Long.rotateLeft(v3, 31);
         v3 *= PRIME64_1;
 
-        v4 += mb.getLong(offset + 24) * PRIME64_2;
+        v4 += Platform.getLong(base, offset + 24) * PRIME64_2;
         v4 = Long.rotateLeft(v4, 31);
         v4 *= PRIME64_1;
 
@@ -190,9 +183,9 @@ public final class XXH64 {
 
     hash += length;
 
-    long limit = length - 8;
+    long limit = end - 8;
     while (offset <= limit) {
-      long k1 = mb.getLong(offset);
+      long k1 = Platform.getLong(base, offset);
       hash ^= Long.rotateLeft(k1 * PRIME64_2, 31) * PRIME64_1;
       hash = Long.rotateLeft(hash, 27) * PRIME64_1 + PRIME64_4;
       offset += 8L;

http://git-wip-us.apache.org/repos/asf/spark/blob/a00a160e/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UTF8StringBuilder.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UTF8StringBuilder.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UTF8StringBuilder.java
index f8000d7..f0f66ba 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UTF8StringBuilder.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UTF8StringBuilder.java
@@ -19,8 +19,6 @@ package org.apache.spark.sql.catalyst.expressions.codegen;
 
 import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.array.ByteArrayMethods;
-import org.apache.spark.unsafe.memory.ByteArrayMemoryBlock;
-import org.apache.spark.unsafe.memory.MemoryBlock;
 import org.apache.spark.unsafe.types.UTF8String;
 
 /**
@@ -31,34 +29,43 @@ public class UTF8StringBuilder {
 
   private static final int ARRAY_MAX = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH;
 
-  private ByteArrayMemoryBlock buffer;
-  private int length = 0;
+  private byte[] buffer;
+  private int cursor = Platform.BYTE_ARRAY_OFFSET;
 
   public UTF8StringBuilder() {
     // Since initial buffer size is 16 in `StringBuilder`, we set the same size here
-    this.buffer = new ByteArrayMemoryBlock(16);
+    this.buffer = new byte[16];
   }
 
   // Grows the buffer by at least `neededSize`
   private void grow(int neededSize) {
-    if (neededSize > ARRAY_MAX - length) {
+    if (neededSize > ARRAY_MAX - totalSize()) {
       throw new UnsupportedOperationException(
         "Cannot grow internal buffer by size " + neededSize + " because the size after growing
" +
           "exceeds size limitation " + ARRAY_MAX);
     }
-    final int requestedSize = length + neededSize;
-    if (buffer.size() < requestedSize) {
-      int newLength = requestedSize < ARRAY_MAX / 2 ? requestedSize * 2 : ARRAY_MAX;
-      final ByteArrayMemoryBlock tmp = new ByteArrayMemoryBlock(newLength);
-      MemoryBlock.copyMemory(buffer, tmp, length);
+    final int length = totalSize() + neededSize;
+    if (buffer.length < length) {
+      int newLength = length < ARRAY_MAX / 2 ? length * 2 : ARRAY_MAX;
+      final byte[] tmp = new byte[newLength];
+      Platform.copyMemory(
+        buffer,
+        Platform.BYTE_ARRAY_OFFSET,
+        tmp,
+        Platform.BYTE_ARRAY_OFFSET,
+        totalSize());
       buffer = tmp;
     }
   }
 
+  private int totalSize() {
+    return cursor - Platform.BYTE_ARRAY_OFFSET;
+  }
+
   public void append(UTF8String value) {
     grow(value.numBytes());
-    value.writeToMemory(buffer.getByteArray(), length + Platform.BYTE_ARRAY_OFFSET);
-    length += value.numBytes();
+    value.writeToMemory(buffer, cursor);
+    cursor += value.numBytes();
   }
 
   public void append(String value) {
@@ -66,6 +73,6 @@ public class UTF8StringBuilder {
   }
 
   public UTF8String build() {
-    return UTF8String.fromBytes(buffer.getByteArray(), 0, length);
+    return UTF8String.fromBytes(buffer, 0, totalSize());
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a00a160e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
index a754e87..742a4f8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
@@ -33,7 +33,6 @@ import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.Platform
 import org.apache.spark.unsafe.hash.Murmur3_x86_32
-import org.apache.spark.unsafe.memory.MemoryBlock
 import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
 
 ////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -362,7 +361,10 @@ abstract class HashExpression[E] extends Expression {
   }
 
   protected def genHashString(input: String, result: String): String = {
-    s"$result = $hasherClassName.hashUTF8String($input, $result);"
+    val baseObject = s"$input.getBaseObject()"
+    val baseOffset = s"$input.getBaseOffset()"
+    val numBytes = s"$input.numBytes()"
+    s"$result = $hasherClassName.hashUnsafeBytes($baseObject, $baseOffset, $numBytes, $result);"
   }
 
   protected def genHashForMap(
@@ -469,8 +471,6 @@ abstract class InterpretedHashFunction {
 
   protected def hashUnsafeBytes(base: AnyRef, offset: Long, length: Int, seed: Long): Long
 
-  protected def hashUnsafeBytesBlock(base: MemoryBlock, seed: Long): Long
-
   /**
    * Computes hash of a given `value` of type `dataType`. The caller needs to check the validity
    * of input `value`.
@@ -496,7 +496,8 @@ abstract class InterpretedHashFunction {
       case c: CalendarInterval => hashInt(c.months, hashLong(c.microseconds, seed))
       case a: Array[Byte] =>
         hashUnsafeBytes(a, Platform.BYTE_ARRAY_OFFSET, a.length, seed)
-      case s: UTF8String => hashUnsafeBytesBlock(s.getMemoryBlock(), seed)
+      case s: UTF8String =>
+        hashUnsafeBytes(s.getBaseObject, s.getBaseOffset, s.numBytes(), seed)
 
       case array: ArrayData =>
         val elementType = dataType match {
@@ -583,15 +584,9 @@ object Murmur3HashFunction extends InterpretedHashFunction {
     Murmur3_x86_32.hashLong(l, seed.toInt)
   }
 
-  override protected def hashUnsafeBytes(
-      base: AnyRef, offset: Long, len: Int, seed: Long): Long = {
+  override protected def hashUnsafeBytes(base: AnyRef, offset: Long, len: Int, seed: Long):
Long = {
     Murmur3_x86_32.hashUnsafeBytes(base, offset, len, seed.toInt)
   }
-
-  override protected def hashUnsafeBytesBlock(
-      base: MemoryBlock, seed: Long): Long = {
-    Murmur3_x86_32.hashUnsafeBytesBlock(base, seed.toInt)
-  }
 }
 
 /**
@@ -616,14 +611,9 @@ object XxHash64Function extends InterpretedHashFunction {
 
   override protected def hashLong(l: Long, seed: Long): Long = XXH64.hashLong(l, seed)
 
-  override protected def hashUnsafeBytes(
-      base: AnyRef, offset: Long, len: Int, seed: Long): Long = {
+  override protected def hashUnsafeBytes(base: AnyRef, offset: Long, len: Int, seed: Long):
Long = {
     XXH64.hashUnsafeBytes(base, offset, len, seed)
   }
-
-  override protected def hashUnsafeBytesBlock(base: MemoryBlock, seed: Long): Long = {
-    XXH64.hashUnsafeBytesBlock(base, seed)
-  }
 }
 
 /**
@@ -730,7 +720,10 @@ case class HiveHash(children: Seq[Expression]) extends HashExpression[Int]
{
      """
 
   override protected def genHashString(input: String, result: String): String = {
-    s"$result = $hasherClassName.hashUTF8String($input);"
+    val baseObject = s"$input.getBaseObject()"
+    val baseOffset = s"$input.getBaseOffset()"
+    val numBytes = s"$input.numBytes()"
+    s"$result = $hasherClassName.hashUnsafeBytes($baseObject, $baseOffset, $numBytes);"
   }
 
   override protected def genHashForArray(
@@ -824,14 +817,10 @@ object HiveHashFunction extends InterpretedHashFunction {
     HiveHasher.hashLong(l)
   }
 
-  override protected def hashUnsafeBytes(
-      base: AnyRef, offset: Long, len: Int, seed: Long): Long = {
+  override protected def hashUnsafeBytes(base: AnyRef, offset: Long, len: Int, seed: Long):
Long = {
     HiveHasher.hashUnsafeBytes(base, offset, len)
   }
 
-  override protected def hashUnsafeBytesBlock(
-    base: MemoryBlock, seed: Long): Long = HiveHasher.hashUnsafeBytesBlock(base)
-
   private val HIVE_DECIMAL_MAX_PRECISION = 38
   private val HIVE_DECIMAL_MAX_SCALE = 38
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a00a160e/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/HiveHasherSuite.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/HiveHasherSuite.java
b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/HiveHasherSuite.java
index 76930f9..b67c6f3 100644
--- a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/HiveHasherSuite.java
+++ b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/HiveHasherSuite.java
@@ -17,8 +17,7 @@
 
 package org.apache.spark.sql.catalyst.expressions;
 
-import org.apache.spark.unsafe.memory.ByteArrayMemoryBlock;
-import org.apache.spark.unsafe.memory.MemoryBlock;
+import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.types.UTF8String;
 import org.junit.Assert;
 import org.junit.Test;
@@ -54,7 +53,7 @@ public class HiveHasherSuite {
 
     for (int i = 0; i < inputs.length; i++) {
       UTF8String s = UTF8String.fromString("val_" + inputs[i]);
-      int hash = HiveHasher.hashUnsafeBytesBlock(s.getMemoryBlock());
+      int hash = HiveHasher.hashUnsafeBytes(s.getBaseObject(), s.getBaseOffset(), s.numBytes());
       Assert.assertEquals(expected[i], ((31 * inputs[i]) + hash));
     }
   }
@@ -90,13 +89,13 @@ public class HiveHasherSuite {
       int byteArrSize = rand.nextInt(100) * 8;
       byte[] bytes = new byte[byteArrSize];
       rand.nextBytes(bytes);
-      MemoryBlock mb = ByteArrayMemoryBlock.fromArray(bytes);
 
       Assert.assertEquals(
-          HiveHasher.hashUnsafeBytesBlock(mb),
-          HiveHasher.hashUnsafeBytesBlock(mb));
+          HiveHasher.hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize),
+          HiveHasher.hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
 
-      hashcodes.add(HiveHasher.hashUnsafeBytesBlock(mb));
+      hashcodes.add(HiveHasher.hashUnsafeBytes(
+          bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
     }
 
     // A very loose bound.
@@ -113,13 +112,13 @@ public class HiveHasherSuite {
       byte[] strBytes = String.valueOf(i).getBytes(StandardCharsets.UTF_8);
       byte[] paddedBytes = new byte[byteArrSize];
       System.arraycopy(strBytes, 0, paddedBytes, 0, strBytes.length);
-      MemoryBlock mb = ByteArrayMemoryBlock.fromArray(paddedBytes);
 
       Assert.assertEquals(
-          HiveHasher.hashUnsafeBytesBlock(mb),
-          HiveHasher.hashUnsafeBytesBlock(mb));
+          HiveHasher.hashUnsafeBytes(paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize),
+          HiveHasher.hashUnsafeBytes(paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
 
-      hashcodes.add(HiveHasher.hashUnsafeBytesBlock(mb));
+      hashcodes.add(HiveHasher.hashUnsafeBytes(
+          paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
     }
 
     // A very loose bound.

http://git-wip-us.apache.org/repos/asf/spark/blob/a00a160e/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/XXH64Suite.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/XXH64Suite.java
b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/XXH64Suite.java
index cd8bce6..1baee91 100644
--- a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/XXH64Suite.java
+++ b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/XXH64Suite.java
@@ -24,8 +24,6 @@ import java.util.Random;
 import java.util.Set;
 
 import org.apache.spark.unsafe.Platform;
-import org.apache.spark.unsafe.memory.ByteArrayMemoryBlock;
-import org.apache.spark.unsafe.memory.MemoryBlock;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -144,13 +142,13 @@ public class XXH64Suite {
       int byteArrSize = rand.nextInt(100) * 8;
       byte[] bytes = new byte[byteArrSize];
       rand.nextBytes(bytes);
-      MemoryBlock mb = ByteArrayMemoryBlock.fromArray(bytes);
 
       Assert.assertEquals(
-              hasher.hashUnsafeWordsBlock(mb),
-              hasher.hashUnsafeWordsBlock(mb));
+              hasher.hashUnsafeWords(bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize),
+              hasher.hashUnsafeWords(bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
 
-      hashcodes.add(hasher.hashUnsafeWordsBlock(mb));
+      hashcodes.add(hasher.hashUnsafeWords(
+              bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
     }
 
     // A very loose bound.
@@ -167,13 +165,13 @@ public class XXH64Suite {
       byte[] strBytes = String.valueOf(i).getBytes(StandardCharsets.UTF_8);
       byte[] paddedBytes = new byte[byteArrSize];
       System.arraycopy(strBytes, 0, paddedBytes, 0, strBytes.length);
-      MemoryBlock mb = ByteArrayMemoryBlock.fromArray(paddedBytes);
 
       Assert.assertEquals(
-              hasher.hashUnsafeWordsBlock(mb),
-              hasher.hashUnsafeWordsBlock(mb));
+              hasher.hashUnsafeWords(paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize),
+              hasher.hashUnsafeWords(paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
 
-      hashcodes.add(hasher.hashUnsafeWordsBlock(mb));
+      hashcodes.add(hasher.hashUnsafeWords(
+              paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
     }
 
     // A very loose bound.

http://git-wip-us.apache.org/repos/asf/spark/blob/a00a160e/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/UTF8StringBuilderSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/UTF8StringBuilderSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/UTF8StringBuilderSuite.scala
deleted file mode 100644
index 1b25a4b..0000000
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/UTF8StringBuilderSuite.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.expressions.codegen
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.unsafe.types.UTF8String
-
-class UTF8StringBuilderSuite extends SparkFunSuite {
-
-  test("basic test") {
-    val sb = new UTF8StringBuilder()
-    assert(sb.build() === UTF8String.EMPTY_UTF8)
-
-    sb.append("")
-    assert(sb.build() === UTF8String.EMPTY_UTF8)
-
-    sb.append("abcd")
-    assert(sb.build() === UTF8String.fromString("abcd"))
-
-    sb.append(UTF8String.fromString("1234"))
-    assert(sb.build() === UTF8String.fromString("abcd1234"))
-
-    // expect to grow an internal buffer
-    sb.append(UTF8String.fromString("efgijk567890"))
-    assert(sb.build() === UTF8String.fromString("abcd1234efgijk567890"))
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a00a160e/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index 6fdadde..5e0cf7d 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -23,7 +23,6 @@ import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.spark.sql.types.*;
 import org.apache.spark.unsafe.Platform;
-import org.apache.spark.unsafe.memory.OffHeapMemoryBlock;
 import org.apache.spark.unsafe.types.UTF8String;
 
 /**
@@ -207,7 +206,7 @@ public final class OffHeapColumnVector extends WritableColumnVector {
 
   @Override
   protected UTF8String getBytesAsUTF8String(int rowId, int count) {
-    return new UTF8String(new OffHeapMemoryBlock(data + rowId, count));
+    return UTF8String.fromAddress(null, data + rowId, count);
   }
 
   //

http://git-wip-us.apache.org/repos/asf/spark/blob/a00a160e/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java
b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java
index 1c9beda..5f58b03 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java
@@ -25,7 +25,6 @@ import org.apache.arrow.vector.holders.NullableVarCharHolder;
 import org.apache.spark.annotation.InterfaceStability;
 import org.apache.spark.sql.execution.arrow.ArrowUtils;
 import org.apache.spark.sql.types.*;
-import org.apache.spark.unsafe.memory.OffHeapMemoryBlock;
 import org.apache.spark.unsafe.types.UTF8String;
 
 /**
@@ -378,10 +377,9 @@ public final class ArrowColumnVector extends ColumnVector {
       if (stringResult.isSet == 0) {
         return null;
       } else {
-        return new UTF8String(new OffHeapMemoryBlock(
+        return UTF8String.fromAddress(null,
           stringResult.buffer.memoryAddress() + stringResult.start,
-          stringResult.end - stringResult.start
-        ));
+          stringResult.end - stringResult.start);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/a00a160e/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala
index 470b93e..50ae26a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.benchmark
 import java.util.{Arrays, Comparator}
 
 import org.apache.spark.unsafe.array.LongArray
-import org.apache.spark.unsafe.memory.OnHeapMemoryBlock
+import org.apache.spark.unsafe.memory.MemoryBlock
 import org.apache.spark.util.Benchmark
 import org.apache.spark.util.collection.Sorter
 import org.apache.spark.util.collection.unsafe.sort._
@@ -36,7 +36,7 @@ import org.apache.spark.util.random.XORShiftRandom
 class SortBenchmark extends BenchmarkBase {
 
   private def referenceKeyPrefixSort(buf: LongArray, lo: Int, hi: Int, refCmp: PrefixComparator)
{
-    val sortBuffer = new LongArray(new OnHeapMemoryBlock(buf.size() * 8L))
+    val sortBuffer = new LongArray(MemoryBlock.fromLongArray(new Array[Long](buf.size().toInt)))
     new Sorter(new UnsafeSortDataFormat(sortBuffer)).sort(
       buf, lo, hi, new Comparator[RecordPointerAndKeyPrefix] {
         override def compare(
@@ -50,8 +50,8 @@ class SortBenchmark extends BenchmarkBase {
   private def generateKeyPrefixTestData(size: Int, rand: => Long): (LongArray, LongArray)
= {
     val ref = Array.tabulate[Long](size * 2) { i => rand }
     val extended = ref ++ Array.fill[Long](size * 2)(0)
-    (new LongArray(OnHeapMemoryBlock.fromArray(ref)),
-      new LongArray(OnHeapMemoryBlock.fromArray(extended)))
+    (new LongArray(MemoryBlock.fromLongArray(ref)),
+      new LongArray(MemoryBlock.fromLongArray(extended)))
   }
 
   ignore("sort") {
@@ -60,7 +60,7 @@ class SortBenchmark extends BenchmarkBase {
     val benchmark = new Benchmark("radix sort " + size, size)
     benchmark.addTimerCase("reference TimSort key prefix array") { timer =>
       val array = Array.tabulate[Long](size * 2) { i => rand.nextLong }
-      val buf = new LongArray(OnHeapMemoryBlock.fromArray(array))
+      val buf = new LongArray(MemoryBlock.fromLongArray(array))
       timer.startTiming()
       referenceKeyPrefixSort(buf, 0, size, PrefixComparators.BINARY)
       timer.stopTiming()
@@ -78,7 +78,7 @@ class SortBenchmark extends BenchmarkBase {
         array(i) = rand.nextLong & 0xff
         i += 1
       }
-      val buf = new LongArray(OnHeapMemoryBlock.fromArray(array))
+      val buf = new LongArray(MemoryBlock.fromLongArray(array))
       timer.startTiming()
       RadixSort.sort(buf, size, 0, 7, false, false)
       timer.stopTiming()
@@ -90,7 +90,7 @@ class SortBenchmark extends BenchmarkBase {
         array(i) = rand.nextLong & 0xffff
         i += 1
       }
-      val buf = new LongArray(OnHeapMemoryBlock.fromArray(array))
+      val buf = new LongArray(MemoryBlock.fromLongArray(array))
       timer.startTiming()
       RadixSort.sort(buf, size, 0, 7, false, false)
       timer.stopTiming()
@@ -102,7 +102,7 @@ class SortBenchmark extends BenchmarkBase {
         array(i) = rand.nextLong
         i += 1
       }
-      val buf = new LongArray(OnHeapMemoryBlock.fromArray(array))
+      val buf = new LongArray(MemoryBlock.fromLongArray(array))
       timer.startTiming()
       RadixSort.sort(buf, size, 0, 7, false, false)
       timer.stopTiming()

http://git-wip-us.apache.org/repos/asf/spark/blob/a00a160e/sql/core/src/test/scala/org/apache/spark/sql/execution/python/RowQueueSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/RowQueueSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/RowQueueSuite.scala
index 25ee95d..ffda33c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/RowQueueSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/RowQueueSuite.scala
@@ -22,13 +22,13 @@ import java.io.File
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.memory.{MemoryManager, TaskMemoryManager, TestMemoryManager}
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
-import org.apache.spark.unsafe.memory.OnHeapMemoryBlock
+import org.apache.spark.unsafe.memory.MemoryBlock
 import org.apache.spark.util.Utils
 
 class RowQueueSuite extends SparkFunSuite {
 
   test("in-memory queue") {
-    val page = new OnHeapMemoryBlock((1<<10) * 8L)
+    val page = MemoryBlock.fromLongArray(new Array[Long](1<<10))
     val queue = new InMemoryRowQueue(page, 1) {
       override def close() {}
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message