spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-24452][SQL][CORE] Avoid possible overflow in int add or multiple
Date Fri, 15 Jun 2018 22:05:53 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 a7d378e78 -> d42610440


[SPARK-24452][SQL][CORE] Avoid possible overflow in int add or multiple

This PR fixes possible overflow in int add or multiply. In particular, their overflows in
multiply are detected by [Spotbugs](https://spotbugs.github.io/)

The following assignments may cause overflow in right hand side. As a result, the result may
be negative.
```
long = int * int
long = int + int
```

To avoid this problem, this PR performs cast from int to long in right hand side.

Existing UTs.

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #21481 from kiszk/SPARK-24452.

(cherry picked from commit 90da7dc241f8eec2348c0434312c97c116330bc4)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d4261044
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d4261044
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d4261044

Branch: refs/heads/branch-2.3
Commit: d42610440ac2e58ef77fcf42ad81ee4fdf5691ba
Parents: a7d378e
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Authored: Fri Jun 15 13:47:48 2018 -0700
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Fri Jun 15 13:49:04 2018 -0700

----------------------------------------------------------------------
 .../spark/unsafe/map/BytesToBytesMap.java       |   2 +-
 .../spark/deploy/worker/DriverRunner.scala      |   2 +-
 .../org/apache/spark/rdd/AsyncRDDActions.scala  |   2 +-
 .../org/apache/spark/storage/BlockManager.scala |   2 +-
 .../catalyst/expressions/UnsafeArrayData.java   |  14 +--
 .../VariableLengthRowBasedKeyValueBatch.java    |   2 +-
 .../vectorized/OffHeapColumnVector.java         | 106 +++++++++----------
 .../vectorized/OnHeapColumnVector.java          |  10 +-
 .../apache/spark/sql/hive/client/HiveShim.scala |   2 +-
 .../streaming/util/FileBasedWriteAheadLog.scala |   2 +-
 10 files changed, 72 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d4261044/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 5f00455..9a767dd 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -703,7 +703,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
       // must be stored in the same memory page.
       // (8 byte key length) (key) (value) (8 byte pointer to next value)
       int uaoSize = UnsafeAlignedOffset.getUaoSize();
-      final long recordLength = (2 * uaoSize) + klen + vlen + 8;
+      final long recordLength = (2L * uaoSize) + klen + vlen + 8;
       if (currentPage == null || currentPage.size() - pageCursor < recordLength) {
         if (!acquireNewPage(recordLength + uaoSize)) {
           return false;

http://git-wip-us.apache.org/repos/asf/spark/blob/d4261044/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index 58a1811..a6d13d1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -225,7 +225,7 @@ private[deploy] class DriverRunner(
       // check if attempting another run
       keepTrying = supervise && exitCode != 0 && !killed
       if (keepTrying) {
-        if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000) {
+        if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000L) {
           waitSeconds = 1
         }
         logInfo(s"Command exited with status $exitCode, re-launching after $waitSeconds s.")

http://git-wip-us.apache.org/repos/asf/spark/blob/d4261044/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
index c9ed12f..47669a0 100644
--- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
@@ -95,7 +95,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with
Loggi
             // the left side of max is >=1 whenever partsScanned >= 2
             numPartsToTry = Math.max(1,
               (1.5 * num * partsScanned / results.size).toInt - partsScanned)
-            numPartsToTry = Math.min(numPartsToTry, partsScanned * 4)
+            numPartsToTry = Math.min(numPartsToTry, partsScanned * 4L)
           }
         }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d4261044/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index e0276a4..df1a4be 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -291,7 +291,7 @@ private[spark] class BlockManager(
         case e: Exception if i < MAX_ATTEMPTS =>
           logError(s"Failed to connect to external shuffle server, will retry ${MAX_ATTEMPTS
- i}"
             + s" more times after waiting $SLEEP_TIME_SECS seconds...", e)
-          Thread.sleep(SLEEP_TIME_SECS * 1000)
+          Thread.sleep(SLEEP_TIME_SECS * 1000L)
         case NonFatal(e) =>
           throw new SparkException("Unable to register with external shuffle server due to
: " +
             e.getMessage, e)

http://git-wip-us.apache.org/repos/asf/spark/blob/d4261044/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
index d18542b..bf7b98a 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
@@ -72,7 +72,7 @@ public final class UnsafeArrayData extends ArrayData {
   private long elementOffset;
 
   private long getElementOffset(int ordinal, int elementSize) {
-    return elementOffset + ordinal * elementSize;
+    return elementOffset + ordinal * (long)elementSize;
   }
 
   public Object getBaseObject() { return baseObject; }
@@ -402,7 +402,7 @@ public final class UnsafeArrayData extends ArrayData {
   public short[] toShortArray() {
     short[] values = new short[numElements];
     Platform.copyMemory(
-      baseObject, elementOffset, values, Platform.SHORT_ARRAY_OFFSET, numElements * 2);
+      baseObject, elementOffset, values, Platform.SHORT_ARRAY_OFFSET, numElements * 2L);
     return values;
   }
 
@@ -410,7 +410,7 @@ public final class UnsafeArrayData extends ArrayData {
   public int[] toIntArray() {
     int[] values = new int[numElements];
     Platform.copyMemory(
-      baseObject, elementOffset, values, Platform.INT_ARRAY_OFFSET, numElements * 4);
+      baseObject, elementOffset, values, Platform.INT_ARRAY_OFFSET, numElements * 4L);
     return values;
   }
 
@@ -418,7 +418,7 @@ public final class UnsafeArrayData extends ArrayData {
   public long[] toLongArray() {
     long[] values = new long[numElements];
     Platform.copyMemory(
-      baseObject, elementOffset, values, Platform.LONG_ARRAY_OFFSET, numElements * 8);
+      baseObject, elementOffset, values, Platform.LONG_ARRAY_OFFSET, numElements * 8L);
     return values;
   }
 
@@ -426,7 +426,7 @@ public final class UnsafeArrayData extends ArrayData {
   public float[] toFloatArray() {
     float[] values = new float[numElements];
     Platform.copyMemory(
-      baseObject, elementOffset, values, Platform.FLOAT_ARRAY_OFFSET, numElements * 4);
+      baseObject, elementOffset, values, Platform.FLOAT_ARRAY_OFFSET, numElements * 4L);
     return values;
   }
 
@@ -434,14 +434,14 @@ public final class UnsafeArrayData extends ArrayData {
   public double[] toDoubleArray() {
     double[] values = new double[numElements];
     Platform.copyMemory(
-      baseObject, elementOffset, values, Platform.DOUBLE_ARRAY_OFFSET, numElements * 8);
+      baseObject, elementOffset, values, Platform.DOUBLE_ARRAY_OFFSET, numElements * 8L);
     return values;
   }
 
   private static UnsafeArrayData fromPrimitiveArray(
        Object arr, int offset, int length, int elementSize) {
     final long headerInBytes = calculateHeaderPortionInBytes(length);
-    final long valueRegionInBytes = elementSize * length;
+    final long valueRegionInBytes = (long)elementSize * length;
     final long totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) / 8;
     if (totalSizeInLongs > Integer.MAX_VALUE / 8) {
       throw new UnsupportedOperationException("Cannot convert this array to unsafe format
as " +

http://git-wip-us.apache.org/repos/asf/spark/blob/d4261044/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/VariableLengthRowBasedKeyValueBatch.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/VariableLengthRowBasedKeyValueBatch.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/VariableLengthRowBasedKeyValueBatch.java
index 905e682..c823de4 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/VariableLengthRowBasedKeyValueBatch.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/VariableLengthRowBasedKeyValueBatch.java
@@ -41,7 +41,7 @@ public final class VariableLengthRowBasedKeyValueBatch extends RowBasedKeyValueB
   @Override
   public UnsafeRow appendRow(Object kbase, long koff, int klen,
                              Object vbase, long voff, int vlen) {
-    final long recordLength = 8 + klen + vlen + 8;
+    final long recordLength = 8L + klen + vlen + 8;
     // if run out of max supported rows or page size, return null
     if (numRows >= capacity || page == null || page.size() - pageCursor < recordLength)
{
       return null;

http://git-wip-us.apache.org/repos/asf/spark/blob/d4261044/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index 754c265..5e0cf7d 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -215,12 +215,12 @@ public final class OffHeapColumnVector extends WritableColumnVector
{
 
   @Override
   public void putShort(int rowId, short value) {
-    Platform.putShort(null, data + 2 * rowId, value);
+    Platform.putShort(null, data + 2L * rowId, value);
   }
 
   @Override
   public void putShorts(int rowId, int count, short value) {
-    long offset = data + 2 * rowId;
+    long offset = data + 2L * rowId;
     for (int i = 0; i < count; ++i, offset += 2) {
       Platform.putShort(null, offset, value);
     }
@@ -228,20 +228,20 @@ public final class OffHeapColumnVector extends WritableColumnVector
{
 
   @Override
   public void putShorts(int rowId, int count, short[] src, int srcIndex) {
-    Platform.copyMemory(src, Platform.SHORT_ARRAY_OFFSET + srcIndex * 2,
-        null, data + 2 * rowId, count * 2);
+    Platform.copyMemory(src, Platform.SHORT_ARRAY_OFFSET + srcIndex * 2L,
+        null, data + 2L * rowId, count * 2L);
   }
 
   @Override
   public void putShorts(int rowId, int count, byte[] src, int srcIndex) {
     Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
-      null, data + rowId * 2, count * 2);
+      null, data + rowId * 2L, count * 2L);
   }
 
   @Override
   public short getShort(int rowId) {
     if (dictionary == null) {
-      return Platform.getShort(null, data + 2 * rowId);
+      return Platform.getShort(null, data + 2L * rowId);
     } else {
       return (short) dictionary.decodeToInt(dictionaryIds.getDictId(rowId));
     }
@@ -251,7 +251,7 @@ public final class OffHeapColumnVector extends WritableColumnVector {
   public short[] getShorts(int rowId, int count) {
     assert(dictionary == null);
     short[] array = new short[count];
-    Platform.copyMemory(null, data + rowId * 2, array, Platform.SHORT_ARRAY_OFFSET, count
* 2);
+    Platform.copyMemory(null, data + rowId * 2L, array, Platform.SHORT_ARRAY_OFFSET, count
* 2L);
     return array;
   }
 
@@ -261,12 +261,12 @@ public final class OffHeapColumnVector extends WritableColumnVector
{
 
   @Override
   public void putInt(int rowId, int value) {
-    Platform.putInt(null, data + 4 * rowId, value);
+    Platform.putInt(null, data + 4L * rowId, value);
   }
 
   @Override
   public void putInts(int rowId, int count, int value) {
-    long offset = data + 4 * rowId;
+    long offset = data + 4L * rowId;
     for (int i = 0; i < count; ++i, offset += 4) {
       Platform.putInt(null, offset, value);
     }
@@ -274,24 +274,24 @@ public final class OffHeapColumnVector extends WritableColumnVector
{
 
   @Override
   public void putInts(int rowId, int count, int[] src, int srcIndex) {
-    Platform.copyMemory(src, Platform.INT_ARRAY_OFFSET + srcIndex * 4,
-        null, data + 4 * rowId, count * 4);
+    Platform.copyMemory(src, Platform.INT_ARRAY_OFFSET + srcIndex * 4L,
+        null, data + 4L * rowId, count * 4L);
   }
 
   @Override
   public void putInts(int rowId, int count, byte[] src, int srcIndex) {
     Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
-      null, data + rowId * 4, count * 4);
+      null, data + rowId * 4L, count * 4L);
   }
 
   @Override
   public void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
     if (!bigEndianPlatform) {
       Platform.copyMemory(src, srcIndex + Platform.BYTE_ARRAY_OFFSET,
-          null, data + 4 * rowId, count * 4);
+          null, data + 4L * rowId, count * 4L);
     } else {
       int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET;
-      long offset = data + 4 * rowId;
+      long offset = data + 4L * rowId;
       for (int i = 0; i < count; ++i, offset += 4, srcOffset += 4) {
         Platform.putInt(null, offset,
             java.lang.Integer.reverseBytes(Platform.getInt(src, srcOffset)));
@@ -302,7 +302,7 @@ public final class OffHeapColumnVector extends WritableColumnVector {
   @Override
   public int getInt(int rowId) {
     if (dictionary == null) {
-      return Platform.getInt(null, data + 4 * rowId);
+      return Platform.getInt(null, data + 4L * rowId);
     } else {
       return dictionary.decodeToInt(dictionaryIds.getDictId(rowId));
     }
@@ -312,7 +312,7 @@ public final class OffHeapColumnVector extends WritableColumnVector {
   public int[] getInts(int rowId, int count) {
     assert(dictionary == null);
     int[] array = new int[count];
-    Platform.copyMemory(null, data + rowId * 4, array, Platform.INT_ARRAY_OFFSET, count *
4);
+    Platform.copyMemory(null, data + rowId * 4L, array, Platform.INT_ARRAY_OFFSET, count
* 4L);
     return array;
   }
 
@@ -324,7 +324,7 @@ public final class OffHeapColumnVector extends WritableColumnVector {
   public int getDictId(int rowId) {
     assert(dictionary == null)
             : "A ColumnVector dictionary should not have a dictionary for itself.";
-    return Platform.getInt(null, data + 4 * rowId);
+    return Platform.getInt(null, data + 4L * rowId);
   }
 
   //
@@ -333,12 +333,12 @@ public final class OffHeapColumnVector extends WritableColumnVector
{
 
   @Override
   public void putLong(int rowId, long value) {
-    Platform.putLong(null, data + 8 * rowId, value);
+    Platform.putLong(null, data + 8L * rowId, value);
   }
 
   @Override
   public void putLongs(int rowId, int count, long value) {
-    long offset = data + 8 * rowId;
+    long offset = data + 8L * rowId;
     for (int i = 0; i < count; ++i, offset += 8) {
       Platform.putLong(null, offset, value);
     }
@@ -346,24 +346,24 @@ public final class OffHeapColumnVector extends WritableColumnVector
{
 
   @Override
   public void putLongs(int rowId, int count, long[] src, int srcIndex) {
-    Platform.copyMemory(src, Platform.LONG_ARRAY_OFFSET + srcIndex * 8,
-        null, data + 8 * rowId, count * 8);
+    Platform.copyMemory(src, Platform.LONG_ARRAY_OFFSET + srcIndex * 8L,
+        null, data + 8L * rowId, count * 8L);
   }
 
   @Override
   public void putLongs(int rowId, int count, byte[] src, int srcIndex) {
     Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
-      null, data + rowId * 8, count * 8);
+      null, data + rowId * 8L, count * 8L);
   }
 
   @Override
   public void putLongsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
     if (!bigEndianPlatform) {
       Platform.copyMemory(src, srcIndex + Platform.BYTE_ARRAY_OFFSET,
-          null, data + 8 * rowId, count * 8);
+          null, data + 8L * rowId, count * 8L);
     } else {
       int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET;
-      long offset = data + 8 * rowId;
+      long offset = data + 8L * rowId;
       for (int i = 0; i < count; ++i, offset += 8, srcOffset += 8) {
         Platform.putLong(null, offset,
             java.lang.Long.reverseBytes(Platform.getLong(src, srcOffset)));
@@ -374,7 +374,7 @@ public final class OffHeapColumnVector extends WritableColumnVector {
   @Override
   public long getLong(int rowId) {
     if (dictionary == null) {
-      return Platform.getLong(null, data + 8 * rowId);
+      return Platform.getLong(null, data + 8L * rowId);
     } else {
       return dictionary.decodeToLong(dictionaryIds.getDictId(rowId));
     }
@@ -384,7 +384,7 @@ public final class OffHeapColumnVector extends WritableColumnVector {
   public long[] getLongs(int rowId, int count) {
     assert(dictionary == null);
     long[] array = new long[count];
-    Platform.copyMemory(null, data + rowId * 8, array, Platform.LONG_ARRAY_OFFSET, count
* 8);
+    Platform.copyMemory(null, data + rowId * 8L, array, Platform.LONG_ARRAY_OFFSET, count
* 8L);
     return array;
   }
 
@@ -394,12 +394,12 @@ public final class OffHeapColumnVector extends WritableColumnVector
{
 
   @Override
   public void putFloat(int rowId, float value) {
-    Platform.putFloat(null, data + rowId * 4, value);
+    Platform.putFloat(null, data + rowId * 4L, value);
   }
 
   @Override
   public void putFloats(int rowId, int count, float value) {
-    long offset = data + 4 * rowId;
+    long offset = data + 4L * rowId;
     for (int i = 0; i < count; ++i, offset += 4) {
       Platform.putFloat(null, offset, value);
     }
@@ -407,18 +407,18 @@ public final class OffHeapColumnVector extends WritableColumnVector
{
 
   @Override
   public void putFloats(int rowId, int count, float[] src, int srcIndex) {
-    Platform.copyMemory(src, Platform.FLOAT_ARRAY_OFFSET + srcIndex * 4,
-        null, data + 4 * rowId, count * 4);
+    Platform.copyMemory(src, Platform.FLOAT_ARRAY_OFFSET + srcIndex * 4L,
+        null, data + 4L * rowId, count * 4L);
   }
 
   @Override
   public void putFloats(int rowId, int count, byte[] src, int srcIndex) {
     if (!bigEndianPlatform) {
       Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
-          null, data + rowId * 4, count * 4);
+          null, data + rowId * 4L, count * 4L);
     } else {
       ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN);
-      long offset = data + 4 * rowId;
+      long offset = data + 4L * rowId;
       for (int i = 0; i < count; ++i, offset += 4) {
         Platform.putFloat(null, offset, bb.getFloat(srcIndex + (4 * i)));
       }
@@ -428,7 +428,7 @@ public final class OffHeapColumnVector extends WritableColumnVector {
   @Override
   public float getFloat(int rowId) {
     if (dictionary == null) {
-      return Platform.getFloat(null, data + rowId * 4);
+      return Platform.getFloat(null, data + rowId * 4L);
     } else {
       return dictionary.decodeToFloat(dictionaryIds.getDictId(rowId));
     }
@@ -438,7 +438,7 @@ public final class OffHeapColumnVector extends WritableColumnVector {
   public float[] getFloats(int rowId, int count) {
     assert(dictionary == null);
     float[] array = new float[count];
-    Platform.copyMemory(null, data + rowId * 4, array, Platform.FLOAT_ARRAY_OFFSET, count
* 4);
+    Platform.copyMemory(null, data + rowId * 4L, array, Platform.FLOAT_ARRAY_OFFSET, count
* 4L);
     return array;
   }
 
@@ -449,12 +449,12 @@ public final class OffHeapColumnVector extends WritableColumnVector
{
 
   @Override
   public void putDouble(int rowId, double value) {
-    Platform.putDouble(null, data + rowId * 8, value);
+    Platform.putDouble(null, data + rowId * 8L, value);
   }
 
   @Override
   public void putDoubles(int rowId, int count, double value) {
-    long offset = data + 8 * rowId;
+    long offset = data + 8L * rowId;
     for (int i = 0; i < count; ++i, offset += 8) {
       Platform.putDouble(null, offset, value);
     }
@@ -462,18 +462,18 @@ public final class OffHeapColumnVector extends WritableColumnVector
{
 
   @Override
   public void putDoubles(int rowId, int count, double[] src, int srcIndex) {
-    Platform.copyMemory(src, Platform.DOUBLE_ARRAY_OFFSET + srcIndex * 8,
-      null, data + 8 * rowId, count * 8);
+    Platform.copyMemory(src, Platform.DOUBLE_ARRAY_OFFSET + srcIndex * 8L,
+      null, data + 8L * rowId, count * 8L);
   }
 
   @Override
   public void putDoubles(int rowId, int count, byte[] src, int srcIndex) {
     if (!bigEndianPlatform) {
       Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
-        null, data + rowId * 8, count * 8);
+        null, data + rowId * 8L, count * 8L);
     } else {
       ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN);
-      long offset = data + 8 * rowId;
+      long offset = data + 8L * rowId;
       for (int i = 0; i < count; ++i, offset += 8) {
         Platform.putDouble(null, offset, bb.getDouble(srcIndex + (8 * i)));
       }
@@ -483,7 +483,7 @@ public final class OffHeapColumnVector extends WritableColumnVector {
   @Override
   public double getDouble(int rowId) {
     if (dictionary == null) {
-      return Platform.getDouble(null, data + rowId * 8);
+      return Platform.getDouble(null, data + rowId * 8L);
     } else {
       return dictionary.decodeToDouble(dictionaryIds.getDictId(rowId));
     }
@@ -493,7 +493,7 @@ public final class OffHeapColumnVector extends WritableColumnVector {
   public double[] getDoubles(int rowId, int count) {
     assert(dictionary == null);
     double[] array = new double[count];
-    Platform.copyMemory(null, data + rowId * 8, array, Platform.DOUBLE_ARRAY_OFFSET, count
* 8);
+    Platform.copyMemory(null, data + rowId * 8L, array, Platform.DOUBLE_ARRAY_OFFSET, count
* 8L);
     return array;
   }
 
@@ -503,26 +503,26 @@ public final class OffHeapColumnVector extends WritableColumnVector
{
   @Override
   public void putArray(int rowId, int offset, int length) {
     assert(offset >= 0 && offset + length <= childColumns[0].capacity);
-    Platform.putInt(null, lengthData + 4 * rowId, length);
-    Platform.putInt(null, offsetData + 4 * rowId, offset);
+    Platform.putInt(null, lengthData + 4L * rowId, length);
+    Platform.putInt(null, offsetData + 4L * rowId, offset);
   }
 
   @Override
   public int getArrayLength(int rowId) {
-    return Platform.getInt(null, lengthData + 4 * rowId);
+    return Platform.getInt(null, lengthData + 4L * rowId);
   }
 
   @Override
   public int getArrayOffset(int rowId) {
-    return Platform.getInt(null, offsetData + 4 * rowId);
+    return Platform.getInt(null, offsetData + 4L * rowId);
   }
 
   // APIs dealing with ByteArrays
   @Override
   public int putByteArray(int rowId, byte[] value, int offset, int length) {
     int result = arrayData().appendBytes(length, value, offset);
-    Platform.putInt(null, lengthData + 4 * rowId, length);
-    Platform.putInt(null, offsetData + 4 * rowId, result);
+    Platform.putInt(null, lengthData + 4L * rowId, length);
+    Platform.putInt(null, offsetData + 4L * rowId, result);
     return result;
   }
 
@@ -532,19 +532,19 @@ public final class OffHeapColumnVector extends WritableColumnVector
{
     int oldCapacity = (nulls == 0L) ? 0 : capacity;
     if (isArray() || type instanceof MapType) {
       this.lengthData =
-          Platform.reallocateMemory(lengthData, oldCapacity * 4, newCapacity * 4);
+          Platform.reallocateMemory(lengthData, oldCapacity * 4L, newCapacity * 4L);
       this.offsetData =
-          Platform.reallocateMemory(offsetData, oldCapacity * 4, newCapacity * 4);
+          Platform.reallocateMemory(offsetData, oldCapacity * 4L, newCapacity * 4L);
     } else if (type instanceof ByteType || type instanceof BooleanType) {
       this.data = Platform.reallocateMemory(data, oldCapacity, newCapacity);
     } else if (type instanceof ShortType) {
-      this.data = Platform.reallocateMemory(data, oldCapacity * 2, newCapacity * 2);
+      this.data = Platform.reallocateMemory(data, oldCapacity * 2L, newCapacity * 2L);
     } else if (type instanceof IntegerType || type instanceof FloatType ||
         type instanceof DateType || DecimalType.is32BitDecimalType(type)) {
-      this.data = Platform.reallocateMemory(data, oldCapacity * 4, newCapacity * 4);
+      this.data = Platform.reallocateMemory(data, oldCapacity * 4L, newCapacity * 4L);
     } else if (type instanceof LongType || type instanceof DoubleType ||
         DecimalType.is64BitDecimalType(type) || type instanceof TimestampType) {
-      this.data = Platform.reallocateMemory(data, oldCapacity * 8, newCapacity * 8);
+      this.data = Platform.reallocateMemory(data, oldCapacity * 8L, newCapacity * 8L);
     } else if (childColumns != null) {
       // Nothing to store.
     } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/d4261044/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
index 23dcc10..577eab6 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
@@ -231,7 +231,7 @@ public final class OnHeapColumnVector extends WritableColumnVector {
   @Override
   public void putShorts(int rowId, int count, byte[] src, int srcIndex) {
     Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, shortData,
-      Platform.SHORT_ARRAY_OFFSET + rowId * 2, count * 2);
+      Platform.SHORT_ARRAY_OFFSET + rowId * 2L, count * 2L);
   }
 
   @Override
@@ -276,7 +276,7 @@ public final class OnHeapColumnVector extends WritableColumnVector {
   @Override
   public void putInts(int rowId, int count, byte[] src, int srcIndex) {
     Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, intData,
-      Platform.INT_ARRAY_OFFSET + rowId * 4, count * 4);
+      Platform.INT_ARRAY_OFFSET + rowId * 4L, count * 4L);
   }
 
   @Override
@@ -342,7 +342,7 @@ public final class OnHeapColumnVector extends WritableColumnVector {
   @Override
   public void putLongs(int rowId, int count, byte[] src, int srcIndex) {
     Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, longData,
-      Platform.LONG_ARRAY_OFFSET + rowId * 8, count * 8);
+      Platform.LONG_ARRAY_OFFSET + rowId * 8L, count * 8L);
   }
 
   @Override
@@ -394,7 +394,7 @@ public final class OnHeapColumnVector extends WritableColumnVector {
   public void putFloats(int rowId, int count, byte[] src, int srcIndex) {
     if (!bigEndianPlatform) {
       Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, floatData,
-          Platform.DOUBLE_ARRAY_OFFSET + rowId * 4, count * 4);
+          Platform.DOUBLE_ARRAY_OFFSET + rowId * 4L, count * 4L);
     } else {
       ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN);
       for (int i = 0; i < count; ++i) {
@@ -443,7 +443,7 @@ public final class OnHeapColumnVector extends WritableColumnVector {
   public void putDoubles(int rowId, int count, byte[] src, int srcIndex) {
     if (!bigEndianPlatform) {
       Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, doubleData,
-          Platform.DOUBLE_ARRAY_OFFSET + rowId * 8, count * 8);
+          Platform.DOUBLE_ARRAY_OFFSET + rowId * 8L, count * 8L);
     } else {
       ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN);
       for (int i = 0; i < count; ++i) {

http://git-wip-us.apache.org/repos/asf/spark/blob/d4261044/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index 1eac70d..60fe31f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -343,7 +343,7 @@ private[client] class Shim_v0_12 extends Shim with Logging {
   }
 
   override def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long = {
-    conf.getIntVar(HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY) * 1000
+    conf.getIntVar(HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY) * 1000L
   }
 
   override def loadPartition(

http://git-wip-us.apache.org/repos/asf/spark/blob/d4261044/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index ab7c855..2e85990 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -222,7 +222,7 @@ private[streaming] class FileBasedWriteAheadLog(
         pastLogs += LogInfo(currentLogWriterStartTime, currentLogWriterStopTime, _)
       }
       currentLogWriterStartTime = currentTime
-      currentLogWriterStopTime = currentTime + (rollingIntervalSecs * 1000)
+      currentLogWriterStopTime = currentTime + (rollingIntervalSecs * 1000L)
       val newLogPath = new Path(logDirectory,
         timeToLogFile(currentLogWriterStartTime, currentLogWriterStopTime))
       currentLogPath = Some(newLogPath.toString)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message