tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [1/2] TAJO-1084: Generated codes should access directly UnSafeTuple and RowWriter.
Date Wed, 01 Oct 2014 07:41:19 GMT
Repository: tajo
Updated Branches:
  refs/heads/block_iteration 292f67438 -> 95af3cce9


http://git-wip-us.apache.org/repos/asf/tajo/blob/95af3cce/tajo-storage/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
index e00b977..e87c93c 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
@@ -103,8 +103,10 @@ public class BaseTupleBuilder extends OffHeapRowWriter implements TupleBuilder,
   }
 
   public void release() {
-    UnsafeUtil.free(buffer);
-    buffer = null;
-    address = 0;
+    if (buffer != null) {
+      UnsafeUtil.free(buffer);
+      buffer = null;
+      address = 0;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/95af3cce/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
index e38555c..f3fdbb9 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
@@ -1,4 +1,4 @@
-/***
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -33,21 +33,24 @@ import java.nio.ByteBuffer;
 
 import static org.apache.tajo.common.TajoDataTypes.DataType;
 
+/**
+ * Immutable Tuple
+ */
 public class HeapTuple implements Tuple {
   private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
   private static final long BASE_OFFSET = UnsafeUtil.ARRAY_BYTE_BASE_OFFSET;
 
-  private final byte [] data;
-  private final DataType [] types;
+  final byte [] data;
+  private final DataType[] types;
 
-  public HeapTuple(final byte [] bytes, final DataType [] types) {
+  public HeapTuple(final byte [] bytes, final DataType[] types) {
     this.data = bytes;
     this.types = types;
   }
 
   @Override
   public int size() {
-    return data.length;
+    return types.length;
   }
 
   public ByteBuffer nioBuffer() {
@@ -58,7 +61,7 @@ public class HeapTuple implements Tuple {
     return UNSAFE.getInt(data, BASE_OFFSET + SizeOf.SIZE_OF_INT + (fieldId * SizeOf.SIZE_OF_INT));
   }
 
-  private int checkNullAndGetOffset(int fieldId) {
+  int checkNullAndGetOffset(int fieldId) {
     int offset = getFieldOffset(fieldId);
     if (offset == OffHeapRowBlock.NULL_FIELD_OFFSET) {
       throw new RuntimeException("Invalid Field Access: " + fieldId);
@@ -88,22 +91,22 @@ public class HeapTuple implements Tuple {
 
   @Override
   public void put(int fieldId, Datum value) {
-    throw new UnsupportedException("UnSafeTuple does not support put(int, Datum).");
+    throw new UnsupportedException("HeapTuple does not support put(int, Datum).");
   }
 
   @Override
   public void put(int fieldId, Datum[] values) {
-    throw new UnsupportedException("UnSafeTuple does not support put(int, Datum []).");
+    throw new UnsupportedException("HeapTuple does not support put(int, Datum []).");
   }
 
   @Override
   public void put(int fieldId, Tuple tuple) {
-    throw new UnsupportedException("UnSafeTuple does not support put(int, Tuple).");
+    throw new UnsupportedException("HeapTuple does not support put(int, Tuple).");
   }
 
   @Override
   public void put(Datum[] values) {
-    throw new UnsupportedException("UnSafeTuple does not support put(Datum []).");
+    throw new UnsupportedException("HeapTuple does not support put(Datum []).");
   }
 
   @Override
@@ -115,13 +118,15 @@ public class HeapTuple implements Tuple {
     switch (types[fieldId].getType()) {
     case BOOLEAN:
       return DatumFactory.createBool(getBool(fieldId));
+    case CHAR:
+      return DatumFactory.createChar(getBytes(fieldId));
     case INT1:
     case INT2:
       return DatumFactory.createInt2(getInt2(fieldId));
     case INT4:
       return DatumFactory.createInt4(getInt4(fieldId));
     case INT8:
-      return DatumFactory.createInt8(getInt4(fieldId));
+      return DatumFactory.createInt8(getInt8(fieldId));
     case FLOAT4:
       return DatumFactory.createFloat4(getFloat4(fieldId));
     case FLOAT8:
@@ -138,6 +143,8 @@ public class HeapTuple implements Tuple {
       return getInterval(fieldId);
     case INET4:
       return DatumFactory.createInet4(getInt4(fieldId));
+    case BLOB:
+      return DatumFactory.createBlob(getBytes(fieldId));
     case PROTOBUF:
       return getProtobufDatum(fieldId);
     default:
@@ -251,7 +258,7 @@ public class HeapTuple implements Tuple {
 
   @Override
   public Datum[] getValues() {
-    Datum [] datums = new Datum[size()];
+    Datum[] datums = new Datum[size()];
     for (int i = 0; i < size(); i++) {
       if (contains(i)) {
         datums[i] = get(i);

http://git-wip-us.apache.org/repos/asf/tajo/blob/95af3cce/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTupleBytesComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTupleBytesComparator.java
b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTupleBytesComparator.java
new file mode 100644
index 0000000..5298286
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTupleBytesComparator.java
@@ -0,0 +1,105 @@
+/*
+ * Lisensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import com.google.common.primitives.Longs;
+import com.google.common.primitives.UnsignedLongs;
+import org.apache.tajo.util.SizeOf;
+import org.apache.tajo.util.UnsafeUtil;
+import sun.misc.Unsafe;
+
+import java.nio.ByteOrder;
+
+/**
+ * It directly access UTF bytes in UnSafeTuple without any copy. It is used by compiled TupleComparator.
+ */
+public class HeapTupleBytesComparator {
+  private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
+
+  static final boolean littleEndian =
+      ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN);
+
+  public static int compare(HeapTuple t1, int fieldIdx1, HeapTuple t2, int fieldIdx2) {
+    long offset1 = t1.checkNullAndGetOffset(fieldIdx1);
+    long offset2 = t2.checkNullAndGetOffset(fieldIdx2);
+
+    int lstrLen = UNSAFE.getInt(t1.data, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET + offset1);
+    int rstrLen = UNSAFE.getInt(t2.data, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET + offset2);
+
+    offset1 += SizeOf.SIZE_OF_INT;
+    offset2 += SizeOf.SIZE_OF_INT;
+
+    int minLength = Math.min(lstrLen, rstrLen);
+    int minWords = minLength / Longs.BYTES;
+
+    /*
+     * Compare 8 bytes at a time. Benchmarking shows comparing 8 bytes at a
+     * time is no slower than comparing 4 bytes at a time even on 32-bit.
+     * On the other hand, it is substantially faster on 64-bit.
+     */
+    for (int i = 0; i < minWords * Longs.BYTES; i += Longs.BYTES) {
+      long lw = UNSAFE.getLong(t1.data, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET + offset1);
+      long rw = UNSAFE.getLong(t2.data, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET + offset2);
+      long diff = lw ^ rw;
+
+      if (diff != 0) {
+        if (!littleEndian) {
+          return UnsignedLongs.compare(lw, rw);
+        }
+
+        // Use binary search
+        int n = 0;
+        int y;
+        int x = (int) diff;
+        if (x == 0) {
+          x = (int) (diff >>> 32);
+          n = 32;
+        }
+
+        y = x << 16;
+        if (y == 0) {
+          n += 16;
+        } else {
+          x = y;
+        }
+
+        y = x << 8;
+        if (y == 0) {
+          n += 8;
+        }
+        return (int) (((lw >>> n) & 0xFFL) - ((rw >>> n) & 0xFFL));
+      }
+
+      offset1 += SizeOf.SIZE_OF_LONG;
+      offset2 += SizeOf.SIZE_OF_LONG;
+    }
+
+    // The epilogue to cover the last (minLength % 8) elements.
+    for (int i = minWords * Longs.BYTES; i < minLength; i++) {
+      int result = UNSAFE.getByte(t1.data, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET + offset1) -
+          UNSAFE.getByte(t2.data, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET + offset2);
+      offset1++;
+      offset2++;
+      if (result != 0) {
+        return result;
+      }
+    }
+    return lstrLen - rstrLen;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/95af3cce/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java
b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java
index a1e5dbb..22253cb 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.tuple.offheap;
 
+import org.apache.tajo.annotation.UsedByJIT;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.datum.IntervalDatum;
 import org.apache.tajo.datum.ProtobufDatum;
@@ -110,23 +111,49 @@ public abstract class OffHeapRowWriter implements RowWriter {
     forward(curOffset);
   }
 
+  public int currentField() {
+    return curFieldIdx;
+  }
+
   public void skipField() {
-    fieldOffsets[curFieldIdx++] = OffHeapRowBlock.NULL_FIELD_OFFSET;
+    fieldOffsets[curFieldIdx] = OffHeapRowBlock.NULL_FIELD_OFFSET;
+    curFieldIdx++;
+  }
+
+  public void skipField(int num) {
+    for (int i = curFieldIdx; i < num && i < dataTypes.length; i++) {
+      fieldOffsets[curFieldIdx] = OffHeapRowBlock.NULL_FIELD_OFFSET;
+      curFieldIdx++;
+    }
   }
 
+  @UsedByJIT
   private void forwardField() {
-    fieldOffsets[curFieldIdx++] = curOffset;
+    fieldOffsets[curFieldIdx] = curOffset;
+    curFieldIdx++;
   }
 
+  @UsedByJIT
   public void putBool(boolean val) {
     ensureSize(SizeOf.SIZE_OF_BOOL);
     forwardField();
 
-    OffHeapMemory.UNSAFE.putByte(recordStartAddr() + curOffset, (byte) (val ? 0x01 : 0x00));
+    OffHeapMemory.UNSAFE.putByte(recordStartAddr() + curOffset, (byte) (val ? 0x01 : 0x02));
 
     curOffset += SizeOf.SIZE_OF_BOOL;
   }
 
+  @UsedByJIT
+  public void putBool(byte val) {
+    ensureSize(SizeOf.SIZE_OF_BOOL);
+    forwardField();
+
+    OffHeapMemory.UNSAFE.putByte(recordStartAddr() + curOffset, val);
+
+    curOffset += SizeOf.SIZE_OF_BOOL;
+  }
+
+  @UsedByJIT
   public void putInt2(short val) {
     ensureSize(SizeOf.SIZE_OF_SHORT);
     forwardField();
@@ -135,6 +162,7 @@ public abstract class OffHeapRowWriter implements RowWriter {
     curOffset += SizeOf.SIZE_OF_SHORT;
   }
 
+  @UsedByJIT
   public void putInt4(int val) {
     ensureSize(SizeOf.SIZE_OF_INT);
     forwardField();
@@ -143,6 +171,7 @@ public abstract class OffHeapRowWriter implements RowWriter {
     curOffset += SizeOf.SIZE_OF_INT;
   }
 
+  @UsedByJIT
   public void putInt8(long val) {
     ensureSize(SizeOf.SIZE_OF_LONG);
     forwardField();
@@ -151,6 +180,7 @@ public abstract class OffHeapRowWriter implements RowWriter {
     curOffset += SizeOf.SIZE_OF_LONG;
   }
 
+  @UsedByJIT
   public void putFloat4(float val) {
     ensureSize(SizeOf.SIZE_OF_FLOAT);
     forwardField();
@@ -159,6 +189,7 @@ public abstract class OffHeapRowWriter implements RowWriter {
     curOffset += SizeOf.SIZE_OF_FLOAT;
   }
 
+  @UsedByJIT
   public void putFloat8(double val) {
     ensureSize(SizeOf.SIZE_OF_DOUBLE);
     forwardField();
@@ -167,11 +198,13 @@ public abstract class OffHeapRowWriter implements RowWriter {
     curOffset += SizeOf.SIZE_OF_DOUBLE;
   }
 
+  @UsedByJIT
   public void putText(String val) {
     byte[] bytes = val.getBytes(TextDatum.DEFAULT_CHARSET);
     putText(bytes);
   }
 
+  @UsedByJIT
   public void putText(byte[] val) {
     int bytesLen = val.length;
 
@@ -186,6 +219,23 @@ public abstract class OffHeapRowWriter implements RowWriter {
     curOffset += bytesLen;
   }
 
+  @UsedByJIT
+  public void copyTextFrom(UnSafeTuple tuple, int fieldIdx) {
+    long address = tuple.getFieldAddr(fieldIdx);
+    int strLen = OffHeapMemory.UNSAFE.getInt(address);
+    address += SizeOf.SIZE_OF_INT;
+
+    ensureSize(SizeOf.SIZE_OF_INT + strLen);
+    forwardField();
+
+    OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, strLen);
+    curOffset += SizeOf.SIZE_OF_INT;
+
+    OffHeapMemory.UNSAFE.copyMemory(null, address, null, recordStartAddr() + curOffset, strLen);
+    curOffset += strLen;
+  }
+
+  @UsedByJIT
   public void putBlob(byte[] val) {
     int bytesLen = val.length;
 
@@ -200,18 +250,22 @@ public abstract class OffHeapRowWriter implements RowWriter {
     curOffset += bytesLen;
   }
 
+  @UsedByJIT
   public void putTimestamp(long val) {
     putInt8(val);
   }
 
+  @UsedByJIT
   public void putDate(int val) {
     putInt4(val);
   }
 
+  @UsedByJIT
   public void putTime(long val) {
     putInt8(val);
   }
 
+  @UsedByJIT
   public void putInterval(IntervalDatum val) {
     ensureSize(SizeOf.SIZE_OF_INT + SizeOf.SIZE_OF_LONG);
     forwardField();
@@ -223,10 +277,12 @@ public abstract class OffHeapRowWriter implements RowWriter {
     curOffset += SizeOf.SIZE_OF_INT + SizeOf.SIZE_OF_LONG;
   }
 
+  @UsedByJIT
   public void putInet4(int val) {
     putInt4(val);
   }
 
+  @UsedByJIT
   public void putProtoDatum(ProtobufDatum val) {
     putBlob(val.asByteArray());
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/95af3cce/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java
index 59f8d1b..f6d5ba2 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java
@@ -41,8 +41,12 @@ public interface RowWriter {
 
   public void skipField();
 
+  public void skipField(int num);
+
   public void putBool(boolean val);
 
+  public void putBool(byte val);
+
   public void putInt2(short val);
 
   public void putInt4(int val);

http://git-wip-us.apache.org/repos/asf/tajo/blob/95af3cce/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
index 6f4d385..91b8e85 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
@@ -90,7 +90,7 @@ public abstract class UnSafeTuple implements Tuple {
 
   public long getFieldAddr(int fieldId) {
     int fieldOffset = getFieldOffset(fieldId);
-    if (fieldOffset == -1) {
+    if (fieldOffset == OffHeapRowBlock.NULL_FIELD_OFFSET) {
       throw new RuntimeException("Invalid Field Access: " + fieldId);
     }
     return bb.address() + relativePos + fieldOffset;


Mime
View raw message