hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject svn commit: r1667814 - in /hive/trunk: ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/ serde/src/java/org/apache/hadoop/hive/serde2/
Date Thu, 19 Mar 2015 16:50:27 GMT
Author: sershe
Date: Thu Mar 19 16:50:26 2015
New Revision: 1667814

URL: http://svn.apache.org/r1667814
Log:
HIVE-9997 : minor tweaks for bytes mapjoin hash table (Sergey Shelukhin, reviewed by Mostafa
Mokhtar, Ashutosh Chauhan)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestBytesBytesMultiHashMap.java
    hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/ByteStream.java
    hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java?rev=1667814&r1=1667813&r2=1667814&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
Thu Mar 19 16:50:26 2015
@@ -424,7 +424,7 @@ public class MapJoinBytesTableContainer
 
     public void setFromOutput(Output output) {
       if (refs == null) {
-        refs = new ArrayList<WriteBuffers.ByteSegmentRef>(0);
+        refs = new ArrayList<WriteBuffers.ByteSegmentRef>();
       }
       byte aliasFilter = hashMap.getValueRefs(output.getData(), output.getLength(), refs);
       this.aliasFilter = refs.isEmpty() ? (byte) 0xff : aliasFilter;
@@ -472,12 +472,15 @@ public class MapJoinBytesTableContainer
     @Override
     public List<Object> first() throws HiveException {
       currentRow = 0;
-      return next();
+      return nextInternal();
     }
 
-
     @Override
     public List<Object> next() throws HiveException {
+      return nextInternal();
+    }
+
+    private List<Object> nextInternal() throws HiveException {
       if (dummyRow != null) {
         List<Object> result = dummyRow;
         dummyRow = null;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java?rev=1667814&r1=1667813&r2=1667814&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
Thu Mar 19 16:50:26 2015
@@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer.ReusableGetAdaptor;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
@@ -211,6 +212,11 @@ public class VectorMapJoinOperator exten
 
   @Override
   public void closeOp(boolean aborted) throws HiveException {
+    for (MapJoinTableContainer tableContainer : mapJoinTables) {
+      if (tableContainer != null) {
+        tableContainer.dumpMetrics();
+      }
+    }
     if (!aborted && 0 < outputBatch.size) {
       flushOutput();
     }

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestBytesBytesMultiHashMap.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestBytesBytesMultiHashMap.java?rev=1667814&r1=1667813&r2=1667814&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestBytesBytesMultiHashMap.java
(original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestBytesBytesMultiHashMap.java
Thu Mar 19 16:50:26 2015
@@ -165,7 +165,7 @@ public class TestBytesBytesMultiHashMap
 
     @Override
     public void writeKey(RandomAccessOutput dest) throws SerDeException {
-      lastKey += 465623573; // This number is certified to be random.
+      lastKey += 465623573;
       int len = LazyBinaryUtils.writeVLongToByteArray(buffer, lastKey);
       lastBuffer = Arrays.copyOf(buffer, len);
       keys.add(lastBuffer);

Modified: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/ByteStream.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/ByteStream.java?rev=1667814&r1=1667813&r2=1667814&view=diff
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/ByteStream.java (original)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/ByteStream.java Thu Mar 19 16:50:26
2015
@@ -22,6 +22,8 @@ import java.io.IOException;
 
 import org.apache.hadoop.hive.common.io.NonSyncByteArrayInputStream;
 import org.apache.hadoop.hive.common.io.NonSyncByteArrayOutputStream;
+import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
 
 /**
  * Extensions to bytearrayinput/output streams.

Modified: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java?rev=1667814&r1=1667813&r2=1667814&view=diff
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java (original)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java Thu Mar 19 16:50:26
2015
@@ -21,8 +21,11 @@ package org.apache.hadoop.hive.serde2;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 
+import org.apache.hadoop.hive.serde2.ByteStream.Output;
 import org.apache.hadoop.hive.serde2.ByteStream.RandomAccessOutput;
+import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe;
 import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.util.hash.MurmurHash;
 
@@ -35,6 +38,8 @@ public final class WriteBuffers implemen
   private final ArrayList<byte[]> writeBuffers = new ArrayList<byte[]>(1);
   /** Buffer size in writeBuffers */
   private final int wbSize;
+  private final int wbSizeLog2;
+  private final long offsetMask;
   private final long maxSize;
 
   private byte[] currentWriteBuffer;
@@ -47,7 +52,9 @@ public final class WriteBuffers implemen
   private int currentReadOffset = 0;
 
   public WriteBuffers(int wbSize, long maxSize) {
-    this.wbSize = wbSize;
+    this.wbSize = Integer.bitCount(wbSize) == 1 ? wbSize : (Integer.highestOneBit(wbSize)
<< 1);
+    this.wbSizeLog2 = 31 - Integer.numberOfLeadingZeros(this.wbSize);
+    this.offsetMask = this.wbSize - 1;
     this.maxSize = maxSize;
     currentWriteBufferIndex = -1;
     nextBufferToWrite();
@@ -142,7 +149,7 @@ public final class WriteBuffers implemen
 
   @Override
   public void reserve(int byteCount) {
-    if (byteCount < 0) throw new AssertionError("byteCount must be positive");
+    if (byteCount < 0) throw new AssertionError("byteCount must be non-negative");
     int currentWriteOffset = this.currentWriteOffset + byteCount;
     while (currentWriteOffset > wbSize) {
       nextBufferToWrite();
@@ -190,11 +197,11 @@ public final class WriteBuffers implemen
   }
 
   private int getOffset(long offset) {
-    return (int)(offset % wbSize);
+    return (int)(offset & offsetMask);
   }
 
   private int getBufferIndex(long offset) {
-    return (int)(offset / wbSize);
+    return (int)(offset >>> wbSizeLog2);
   }
 
   private void nextBufferToWrite() {
@@ -283,11 +290,11 @@ public final class WriteBuffers implemen
   }
 
   public long getWritePoint() {
-    return (currentWriteBufferIndex * (long)wbSize) + currentWriteOffset;
+    return ((long)currentWriteBufferIndex << wbSizeLog2) + currentWriteOffset;
   }
 
   public long getReadPoint() {
-    return (currentReadBufferIndex * (long)wbSize) + currentReadOffset;
+    return ((long)currentReadBufferIndex << wbSizeLog2) + currentReadOffset;
   }
 
   public void writeVLong(long value) {
@@ -335,12 +342,17 @@ public final class WriteBuffers implemen
    */
   public static class ByteSegmentRef {
     public ByteSegmentRef(long offset, int length) {
+      reset(offset, length);
+    }
+    public void reset(long offset, int length) {
       if (length < 0) {
         throw new AssertionError("Length is negative: " + length);
       }
       this.offset = offset;
       this.length = length;
     }
+    public ByteSegmentRef() {
+    }
     public byte[] getBytes() {
       return bytes;
     }
@@ -367,27 +379,29 @@ public final class WriteBuffers implemen
    * spanning multiple internal buffers.
    */
   public void populateValue(WriteBuffers.ByteSegmentRef value) {
-    // At this point, we are going to make a copy if need to avoid array boundaries.
+    // At this point, we are going to make a copy if needed to avoid array boundaries.
     int index = getBufferIndex(value.getOffset());
     byte[] buffer = writeBuffers.get(index);
     int bufferOffset = getOffset(value.getOffset());
     int length = value.getLength();
     if (bufferOffset + length <= wbSize) {
+      // Common case - the segment is in one buffer.
       value.bytes = buffer;
       value.offset = bufferOffset;
-    } else {
-      value.bytes = new byte[length];
-      value.offset = 0;
-      int destOffset = 0;
-      while (destOffset < length) {
-        if (destOffset > 0) {
-          buffer = writeBuffers.get(++index);
-          bufferOffset = 0;
-        }
-        int toCopy = Math.min(length - destOffset, wbSize - bufferOffset);
-        System.arraycopy(buffer, bufferOffset, value.bytes, destOffset, toCopy);
-        destOffset += toCopy;
-      }
+      return;
+    }
+    // Special case (rare) - the segment is on buffer boundary.
+    value.bytes = new byte[length];
+    value.offset = 0;
+    int destOffset = 0;
+    while (destOffset < length) {
+      if (destOffset > 0) {
+        buffer = writeBuffers.get(++index);
+        bufferOffset = 0;
+      }
+      int toCopy = Math.min(length - destOffset, wbSize - bufferOffset);
+      System.arraycopy(buffer, bufferOffset, value.bytes, destOffset, toCopy);
+      destOffset += toCopy;
     }
   }
 
@@ -437,11 +451,12 @@ public final class WriteBuffers implemen
     int prevIndex = currentWriteBufferIndex, prevOffset = currentWriteOffset;
     setWritePoint(offset);
     if (isAllInOneWriteBuffer(5)) {
-      currentWriteBuffer[currentWriteOffset++] = (byte)(v >>> 32);
-      currentWriteBuffer[currentWriteOffset++] = (byte)(v >>> 24);
-      currentWriteBuffer[currentWriteOffset++] = (byte)(v >>> 16);
-      currentWriteBuffer[currentWriteOffset++] = (byte)(v >>> 8);
-      currentWriteBuffer[currentWriteOffset] = (byte)(v);
+      currentWriteBuffer[currentWriteOffset] = (byte)(v >>> 32);
+      currentWriteBuffer[currentWriteOffset + 1] = (byte)(v >>> 24);
+      currentWriteBuffer[currentWriteOffset + 2] = (byte)(v >>> 16);
+      currentWriteBuffer[currentWriteOffset + 3] = (byte)(v >>> 8);
+      currentWriteBuffer[currentWriteOffset + 4] = (byte)(v);
+      currentWriteOffset += 5;
     } else {
       setByte(offset++, (byte)(v >>> 32));
       setByte(offset++, (byte)(v >>> 24));
@@ -463,10 +478,11 @@ public final class WriteBuffers implemen
     int prevIndex = currentWriteBufferIndex, prevOffset = currentWriteOffset;
     setWritePoint(offset);
     if (isAllInOneWriteBuffer(4)) {
-      currentWriteBuffer[currentWriteOffset++] = (byte)(v >> 24);
-      currentWriteBuffer[currentWriteOffset++] = (byte)(v >> 16);
-      currentWriteBuffer[currentWriteOffset++] = (byte)(v >> 8);
-      currentWriteBuffer[currentWriteOffset] = (byte)(v);
+      currentWriteBuffer[currentWriteOffset] = (byte)(v >> 24);
+      currentWriteBuffer[currentWriteOffset + 1] = (byte)(v >> 16);
+      currentWriteBuffer[currentWriteOffset + 2] = (byte)(v >> 8);
+      currentWriteBuffer[currentWriteOffset + 3] = (byte)(v);
+      currentWriteOffset += 4;
     } else {
       setByte(offset++, (byte)(v >>> 24));
       setByte(offset++, (byte)(v >>> 16));



Mime
View raw message