tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [1/2] TAJO-36: Improve ExternalSortExec with N-merge sort and final pass omission.
Date Tue, 04 Feb 2014 21:09:14 GMT
Updated Branches:
  refs/heads/master 0781a3864 -> 5177dcfa4


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5177dcfa/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
index 749b2e4..73b3692 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
@@ -37,10 +37,8 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
-import java.net.URI;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
-import java.util.Arrays;
 
 public class RawFile {
   private static final Log LOG = LogFactory.getLog(RawFile.class);
@@ -72,10 +70,14 @@ public class RawFile {
     }
 
     public void init() throws IOException {
-      //Preconditions.checkArgument(FileUtil.isLocalPath(path));
-      // TODO - to make it unified one.
-      URI uri = path.toUri();
-      fis = new FileInputStream(new File(uri));
+      File file;
+      if (path.toUri().getScheme() != null) {
+        file = new File(path.toUri());
+      } else {
+        file = new File(path.toString());
+      }
+
+      fis = new FileInputStream(file);
       channel = fis.getChannel();
       fileSize = channel.size();
 
@@ -132,6 +134,88 @@ public class RawFile {
       }
     }
 
+    /**
+     * Decode a ZigZag-encoded 32-bit value.  ZigZag encodes signed integers
+     * into values that can be efficiently encoded with varint.  (Otherwise,
+     * negative values must be sign-extended to 64 bits to be varint encoded,
+     * thus always taking 10 bytes on the wire.)
+     *
+     * @param n An unsigned 32-bit integer, stored in a signed int because
+     *          Java has no explicit unsigned support.
+     * @return A signed 32-bit integer.
+     */
+    public static int decodeZigZag32(final int n) {
+      return (n >>> 1) ^ -(n & 1);
+    }
+
+    /**
+     * Decode a ZigZag-encoded 64-bit value.  ZigZag encodes signed integers
+     * into values that can be efficiently encoded with varint.  (Otherwise,
+     * negative values must be sign-extended to 64 bits to be varint encoded,
+     * thus always taking 10 bytes on the wire.)
+     *
+     * @param n An unsigned 64-bit integer, stored in a signed int because
+     *          Java has no explicit unsigned support.
+     * @return A signed 64-bit integer.
+     */
+    public static long decodeZigZag64(final long n) {
+      return (n >>> 1) ^ -(n & 1);
+    }
+
+
+    /**
+     * Read a raw Varint from the stream.  If larger than 32 bits, discard the
+     * upper bits.
+     */
+    public int readRawVarint32() throws IOException {
+      byte tmp = buffer.get();
+      if (tmp >= 0) {
+        return tmp;
+      }
+      int result = tmp & 0x7f;
+      if ((tmp = buffer.get()) >= 0) {
+        result |= tmp << 7;
+      } else {
+        result |= (tmp & 0x7f) << 7;
+        if ((tmp = buffer.get()) >= 0) {
+          result |= tmp << 14;
+        } else {
+          result |= (tmp & 0x7f) << 14;
+          if ((tmp = buffer.get()) >= 0) {
+            result |= tmp << 21;
+          } else {
+            result |= (tmp & 0x7f) << 21;
+            result |= (tmp = buffer.get()) << 28;
+            if (tmp < 0) {
+              // Discard upper 32 bits.
+              for (int i = 0; i < 5; i++) {
+                if (buffer.get() >= 0) {
+                  return result;
+                }
+              }
+              throw new IOException("Invalid Variable int32");
+            }
+          }
+        }
+      }
+      return result;
+    }
+
+    /** Read a raw Varint from the stream. */
+    public long readRawVarint64() throws IOException {
+      int shift = 0;
+      long result = 0;
+      while (shift < 64) {
+        final byte b = buffer.get();
+        result |= (long)(b & 0x7F) << shift;
+        if ((b & 0x80) == 0) {
+          return result;
+        }
+        shift += 7;
+      }
+      throw new IOException("Invalid Variable int64");
+    }
+
     @Override
     public Tuple next() throws IOException {
       if(eof) return null;
@@ -175,11 +259,10 @@ public class RawFile {
             break;
 
           case CHAR :
-            int realLen = buffer.getInt();
-            byte[] buf = new byte[columnTypes[i].getLength()];
+            int realLen = readRawVarint32();
+            byte[] buf = new byte[realLen];
             buffer.get(buf);
-            byte[] charBuf = Arrays.copyOf(buf, realLen);
-            tuple.put(i, DatumFactory.createChar(charBuf));
+            tuple.put(i, DatumFactory.createChar(buf));
             break;
 
           case INT2 :
@@ -187,11 +270,11 @@ public class RawFile {
             break;
 
           case INT4 :
-            tuple.put(i, DatumFactory.createInt4(buffer.getInt()));
+            tuple.put(i, DatumFactory.createInt4(decodeZigZag32(readRawVarint32())));
             break;
 
           case INT8 :
-            tuple.put(i, DatumFactory.createInt8(buffer.getLong()));
+            tuple.put(i, DatumFactory.createInt8(decodeZigZag64(readRawVarint64())));
             break;
 
           case FLOAT4 :
@@ -202,28 +285,25 @@ public class RawFile {
             tuple.put(i, DatumFactory.createFloat8(buffer.getDouble()));
             break;
 
-          case TEXT :
-            // TODO - shoud use CharsetEncoder / CharsetDecoder
-            //byte [] rawBytes = getColumnBytes();
-            int strSize2 = buffer.getInt();
-            byte [] strBytes2 = new byte[strSize2];
-            buffer.get(strBytes2);
-            tuple.put(i, DatumFactory.createText(new String(strBytes2)));
+          case TEXT : {
+            int len = readRawVarint32();
+            byte [] strBytes = new byte[len];
+            buffer.get(strBytes);
+            tuple.put(i, DatumFactory.createText(new String(strBytes)));
             break;
+          }
 
           case BLOB : {
-            //byte [] rawBytes = getColumnBytes();
-            int byteSize = buffer.getInt();
-            byte [] rawBytes = new byte[byteSize];
+            int len = readRawVarint32();
+            byte [] rawBytes = new byte[len];
             buffer.get(rawBytes);
             tuple.put(i, DatumFactory.createBlob(rawBytes));
             break;
           }
 
           case PROTOBUF: {
-            //byte [] rawBytes = getColumnBytes();
-            int byteSize = buffer.getInt();
-            byte [] rawBytes = new byte[byteSize];
+            int len = readRawVarint32();
+            byte [] rawBytes = new byte[len];
             buffer.get(rawBytes);
 
             ProtobufDatumFactory factory = ProtobufDatumFactory.get(columnTypes[i]);
@@ -324,9 +404,13 @@ public class RawFile {
     }
 
     public void init() throws IOException {
-      // TODO - RawFile only works on Local File System.
-      //Preconditions.checkArgument(FileUtil.isLocalPath(path));
-      File file = new File(path.toUri());
+      File file;
+      if (path.toUri().getScheme() != null) {
+        file = new File(path.toUri());
+      } else {
+        file = new File(path.toString());
+      }
+
       randomAccessFile = new RandomAccessFile(file, "rw");
       channel = randomAccessFile.getChannel();
       pos = 0;
@@ -383,6 +467,78 @@ public class RawFile {
       }
     }
 
+    /**
+     * Encode a ZigZag-encoded 32-bit value.  ZigZag encodes signed integers
+     * into values that can be efficiently encoded with varint.  (Otherwise,
+     * negative values must be sign-extended to 64 bits to be varint encoded,
+     * thus always taking 10 bytes on the wire.)
+     *
+     * @param n A signed 32-bit integer.
+     * @return An unsigned 32-bit integer, stored in a signed int because
+     *         Java has no explicit unsigned support.
+     */
+    public static int encodeZigZag32(final int n) {
+      // Note:  the right-shift must be arithmetic
+      return (n << 1) ^ (n >> 31);
+    }
+
+    /**
+     * Encode a ZigZag-encoded 64-bit value.  ZigZag encodes signed integers
+     * into values that can be efficiently encoded with varint.  (Otherwise,
+     * negative values must be sign-extended to 64 bits to be varint encoded,
+     * thus always taking 10 bytes on the wire.)
+     *
+     * @param n A signed 64-bit integer.
+     * @return An unsigned 64-bit integer, stored in a signed int because
+     *         Java has no explicit unsigned support.
+     */
+    public static long encodeZigZag64(final long n) {
+      // Note:  the right-shift must be arithmetic
+      return (n << 1) ^ (n >> 63);
+    }
+
+    /**
+     * Encode and write a varint.  {@code value} is treated as
+     * unsigned, so it won't be sign-extended if negative.
+     */
+    public void writeRawVarint32(int value) throws IOException {
+      while (true) {
+        if ((value & ~0x7F) == 0) {
+          buffer.put((byte) value);
+          return;
+        } else {
+          buffer.put((byte) ((value & 0x7F) | 0x80));
+          value >>>= 7;
+        }
+      }
+    }
+
+    /**
+     * Compute the number of bytes that would be needed to encode a varint.
+     * {@code value} is treated as unsigned, so it won't be sign-extended if
+     * negative.
+     */
+    public static int computeRawVarint32Size(final int value) {
+      if ((value & (0xffffffff <<  7)) == 0) return 1;
+      if ((value & (0xffffffff << 14)) == 0) return 2;
+      if ((value & (0xffffffff << 21)) == 0) return 3;
+      if ((value & (0xffffffff << 28)) == 0) return 4;
+      return 5;
+    }
+
+    /** Encode and write a varint. */
+    public void writeRawVarint64(long value) throws IOException {
+      while (true) {
+        if ((value & ~0x7FL) == 0) {
+          buffer.put((byte) value);
+          return;
+        } else {
+          buffer.put((byte) ((value & 0x7F) | 0x80));
+          value >>>= 7;
+        }
+      }
+    }
+
     @Override
     public void addTuple(Tuple t) throws IOException {
 
@@ -417,86 +573,73 @@ public class RawFile {
 
           case BOOLEAN:
           case BIT:
-            buffer.put(t.get(i).asByte());
-            break;
-
-          case CHAR :
-            byte[] src = t.getChar(i).asByteArray();
-            byte[] dst = Arrays.copyOf(src, columnTypes[i].getLength());
-            buffer.putInt(src.length);
-            buffer.put(dst);
+            buffer.put(t.getByte(i));
             break;
 
           case INT2 :
-            buffer.putShort(t.get(i).asInt2());
+            buffer.putShort(t.getInt2(i));
             break;
 
           case INT4 :
-            buffer.putInt(t.get(i).asInt4());
+            writeRawVarint32(encodeZigZag32(t.getInt4(i)));
             break;
 
           case INT8 :
-            buffer.putLong(t.get(i).asInt8());
+            writeRawVarint64(encodeZigZag64(t.getInt8(i)));
             break;
 
           case FLOAT4 :
-            buffer.putFloat(t.get(i).asFloat4());
+            buffer.putFloat(t.getFloat4(i));
             break;
 
           case FLOAT8 :
-            buffer.putDouble(t.get(i).asFloat8());
+            buffer.putDouble(t.getFloat8(i));
             break;
 
-          case TEXT:
-            byte [] strBytes2 = t.get(i).asByteArray();
-            if (flushBufferAndReplace(recordOffset, strBytes2.length + 4)) {
+          case CHAR:
+          case TEXT: {
+            byte [] strBytes = t.getBytes(i);
+            if (flushBufferAndReplace(recordOffset, strBytes.length + computeRawVarint32Size(strBytes.length)))
{
               recordOffset = 0;
             }
-            buffer.putInt(strBytes2.length);
-            buffer.put(strBytes2);
+            writeRawVarint32(strBytes.length);
+            buffer.put(strBytes);
             break;
+          }
+
+        case DATE:
+          buffer.putInt(t.getInt4(i));
+          break;
+
+        case TIME:
+        case TIMESTAMP:
+          buffer.putLong(t.getInt8(i));
+          break;
 
           case BLOB : {
-            byte [] rawBytes = t.get(i).asByteArray();
-            if (flushBufferAndReplace(recordOffset, rawBytes.length + 4)) {
+            byte [] rawBytes = t.getBytes(i);
+            if (flushBufferAndReplace(recordOffset, rawBytes.length + computeRawVarint32Size(rawBytes.length)))
{
               recordOffset = 0;
             }
-            buffer.putInt(rawBytes.length);
+            writeRawVarint32(rawBytes.length);
             buffer.put(rawBytes);
             break;
           }
 
           case PROTOBUF: {
-            // TODO - to be fixed
-//            byte [] lengthByte = new byte[4];
-//            byte [] byteArray = t.get(i).asByteArray();
-//            CodedOutputStream outputStream = CodedOutputStream.newInstance(lengthByte);
-//            outputStream.writeUInt32NoTag(byteArray.length);
-//            outputStream.flush();
-//            int legnthByteLength = CodedOutputStream.computeInt32SizeNoTag(byteArray.length);
-//            if (flushBufferAndReplace(recordOffset, byteArray.length + legnthByteLength))
{
-//              recordOffset = 0;
-//            }
-//            buffer.put(lengthByte, 0, legnthByteLength);
-            byte [] rawBytes = t.get(i).asByteArray();
-            if (flushBufferAndReplace(recordOffset, rawBytes.length + 4)) {
+            byte [] rawBytes = t.getBytes(i);
+            if (flushBufferAndReplace(recordOffset, rawBytes.length + computeRawVarint32Size(rawBytes.length)))
{
               recordOffset = 0;
             }
-            buffer.putInt(rawBytes.length);
+            writeRawVarint32(rawBytes.length);
             buffer.put(rawBytes);
             break;
           }
 
           case INET4 :
-            buffer.put(t.get(i).asByteArray());
-            break;
-          case DATE:
-            buffer.putInt(t.get(i).asInt4());
-            break;
-          case TIME:
-          case TIMESTAMP:
-            buffer.putLong(t.get(i).asInt8());
+            buffer.put(t.getBytes(i));
             break;
+
           default:
             throw new IOException("Cannot support data type: " + columnTypes[i].getType());
         }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5177dcfa/tajo-storage/src/main/java/org/apache/tajo/storage/RowFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RowFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RowFile.java
index 1e89f31..954eb54 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/RowFile.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RowFile.java
@@ -243,14 +243,6 @@ public class RowFile {
               tuple.put(i, datum);
               break;
 
-//            case TEXT :
-//              short len = buffer.getShort();
-//              byte[] buf = new byte[len];
-//              buffer.get(buf, 0, len);
-//              datum = DatumFactory.createText(buf);
-//              tuple.put(i, datum);
-//              break;
-
             case TEXT:
               short bytelen = buffer.getShort();
               byte[] strbytes = new byte[bytelen];
@@ -392,47 +384,47 @@ public class RowFile {
           col = schema.getColumn(i);
           switch (col.getDataType().getType()) {
             case BOOLEAN:
-              buffer.put(t.getBoolean(i).asByte());
+              buffer.put(t.get(i).asByte());
               break;
             case BIT:
-              buffer.put(t.getByte(i).asByte());
+              buffer.put(t.get(i).asByte());
               break;
             case CHAR:
-              byte[] src = t.getChar(i).asByteArray();
+              byte[] src = t.get(i).asByteArray();
               byte[] dst = Arrays.copyOf(src, col.getDataType().getLength());
               buffer.putInt(src.length);
               buffer.put(dst);
               break;
             case TEXT:
-              byte [] strbytes = t.getText(i).asByteArray();
+              byte [] strbytes = t.get(i).asByteArray();
               buffer.putShort((short)strbytes.length);
               buffer.put(strbytes, 0, strbytes.length);
               break;
             case INT2:
-              buffer.putShort(t.getShort(i).asInt2());
+              buffer.putShort(t.get(i).asInt2());
               break;
             case INT4:
-              buffer.putInt(t.getInt(i).asInt4());
+              buffer.putInt(t.get(i).asInt4());
               break;
             case INT8:
-              buffer.putLong(t.getLong(i).asInt8());
+              buffer.putLong(t.get(i).asInt8());
               break;
             case FLOAT4:
-              buffer.putFloat(t.getFloat(i).asFloat4());
+              buffer.putFloat(t.get(i).asFloat4());
               break;
             case FLOAT8:
-              buffer.putDouble(t.getDouble(i).asFloat8());
+              buffer.putDouble(t.get(i).asFloat8());
               break;
             case BLOB:
-              byte [] bytes = t.getBytes(i).asByteArray();
+              byte [] bytes = t.get(i).asByteArray();
               buffer.putShort((short)bytes.length);
               buffer.put(bytes);
               break;
             case INET4:
-              buffer.put(t.getIPv4Bytes(i));
+              buffer.put(t.get(i).asByteArray());
               break;
             case INET6:
-              buffer.put(t.getIPv6Bytes(i));
+              buffer.put(t.get(i).asByteArray());
               break;
             case NULL_TYPE:
               nullFlags.set(i);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5177dcfa/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
index e54fb57..b0cb09d 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
@@ -191,10 +191,10 @@ public class RowStoreUtil {
             bb.put(bytes);
             break;
           case INET4:
-            byte [] ipBytes = tuple.getIPv4Bytes(i);
+            byte [] ipBytes = tuple.get(i).asByteArray();
             bb.put(ipBytes);
             break;
-          case INET6: bb.put(tuple.getIPv6Bytes(i)); break;
+          case INET6: bb.put(tuple.get(i).asByteArray()); break;
           default:
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5177dcfa/tajo-storage/src/main/java/org/apache/tajo/storage/Tuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/Tuple.java b/tajo-storage/src/main/java/org/apache/tajo/storage/Tuple.java
index ba35988..a05dc71 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/Tuple.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/Tuple.java
@@ -18,9 +18,7 @@
 
 package org.apache.tajo.storage;
 
-import org.apache.tajo.datum.*;
-
-import java.net.InetAddress;
+import org.apache.tajo.datum.Datum;
 
 public interface Tuple extends Cloneable {
   
@@ -46,35 +44,25 @@ public interface Tuple extends Cloneable {
 	
 	public long getOffset();
 
-	public BooleanDatum getBoolean(int fieldId);
-	
-	public BitDatum getByte(int fieldId);
+	public boolean getBool(int fieldId);
 
-  public CharDatum getChar(int fieldId);
-	
-	public BlobDatum getBytes(int fieldId);
-	
-	public Int2Datum getShort(int fieldId);
-	
-	public Int4Datum getInt(int fieldId);
-	
-	public Int8Datum getLong(int fieldId);
+	public byte getByte(int fieldId);
+
+  public char getChar(int fieldId);
 	
-	public Float4Datum getFloat(int fieldId);
+	public byte [] getBytes(int fieldId);
 	
-	public Float8Datum getDouble(int fieldId);
+	public short getInt2(int fieldId);
 	
-	public Inet4Datum getIPv4(int fieldId);
+	public int getInt4(int fieldId);
 	
-	public byte [] getIPv4Bytes(int fieldId);
+	public long getInt8(int fieldId);
 	
-	public InetAddress getIPv6(int fieldId);
+	public float getFloat4(int fieldId);
 	
-	public byte [] getIPv6Bytes(int fieldId);
+	public double getFloat8(int fieldId);
 	
-	public TextDatum getString(int fieldId);
-
-  public TextDatum getText(int fieldId);
+	public String getText(int fieldId);
 
   public Tuple clone() throws CloneNotSupportedException;
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5177dcfa/tajo-storage/src/main/java/org/apache/tajo/storage/VTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/VTuple.java b/tajo-storage/src/main/java/org/apache/tajo/storage/VTuple.java
index 878c05e..72a4566 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/VTuple.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/VTuple.java
@@ -34,8 +34,7 @@ public class VTuple implements Tuple, Cloneable {
 	}
 
   public VTuple(Tuple tuple) {
-    this.values = new Datum[tuple.size()];
-    System.arraycopy(((VTuple)tuple).values, 0, values, 0, tuple.size());
+    this.values = tuple.getValues().clone();
     this.offset = ((VTuple)tuple).offset;
   }
 
@@ -106,47 +105,55 @@ public class VTuple implements Tuple, Cloneable {
 	}
 	
 	@Override
-	public BooleanDatum getBoolean(int fieldId) {
-		return (BooleanDatum) values[fieldId];
+	public boolean getBool(int fieldId) {
+		return values[fieldId].asBool();
 	}
 
-	public BitDatum getByte(int fieldId) {
-		return (BitDatum) values[fieldId];
+  @Override
+	public byte getByte(int fieldId) {
+		return values[fieldId].asByte();
 	}
 
-  public CharDatum getChar(int fieldId) {
-    return (CharDatum) values[fieldId];
+  @Override
+  public char getChar(int fieldId) {
+    return values[fieldId].asChar();
   }
 
-	public BlobDatum getBytes(int fieldId) {
-		return (BlobDatum) values[fieldId];
+  @Override
+	public byte [] getBytes(int fieldId) {
+		return values[fieldId].asByteArray();
 	}
 
-	public Int2Datum getShort(int fieldId) {
-		return (Int2Datum) values[fieldId];
+  @Override
+	public short getInt2(int fieldId) {
+		return values[fieldId].asInt2();
 	}
 
-	public Int4Datum getInt(int fieldId) {
-		return (Int4Datum) values[fieldId];
+  @Override
+	public int getInt4(int fieldId) {
+		return values[fieldId].asInt4();
 	}
 
-	public Int8Datum getLong(int fieldId) {
-		return (Int8Datum) values[fieldId];
+  @Override
+	public long getInt8(int fieldId) {
+		return values[fieldId].asInt8();
 	}
 
-	public Float4Datum getFloat(int fieldId) {
-		return (Float4Datum) values[fieldId];
+  @Override
+	public float getFloat4(int fieldId) {
+		return values[fieldId].asFloat4();
 	}
 
-	public Float8Datum getDouble(int fieldId) {
-		return (Float8Datum) values[fieldId];
+  @Override
+	public double getFloat8(int fieldId) {
+		return values[fieldId].asFloat8();
 	}
 
 	public Inet4Datum getIPv4(int fieldId) {
 		return (Inet4Datum) values[fieldId];
 	}
 
-	public byte[] getIPv4Bytes(int fieldId) {
+	public byte [] getIPv4Bytes(int fieldId) {
 		return values[fieldId].asByteArray();
 	}
 
@@ -158,14 +165,10 @@ public class VTuple implements Tuple, Cloneable {
 	  throw new InvalidCastException("IPv6 is unsupported yet");
 	}
 
-	public TextDatum getString(int fieldId) {
-		return (TextDatum) values[fieldId];
-	}
-
   @Override
-  public TextDatum getText(int fieldId) {
-    return (TextDatum) values[fieldId];
-  }
+	public String getText(int fieldId) {
+		return values[fieldId].asChars();
+	}
 
   @Override
   public Tuple clone() throws CloneNotSupportedException {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5177dcfa/tajo-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
index e1430e1..2618527 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
@@ -127,9 +127,9 @@ public class TestLazyTuple {
     assertTrue(t1.contains(0));
     assertTrue(t1.contains(1));
 
-    assertEquals(t1.getString(0).toString(), "str");
-    assertEquals(t1.getInt(1).asInt4(), 2);
-    assertTrue(t1.getFloat(11).asFloat4() == 0.76f);
+    assertEquals(t1.getText(0), "str");
+    assertEquals(t1.get(1).asInt4(), 2);
+    assertTrue(t1.get(11).asFloat4() == 0.76f);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5177dcfa/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
index 1685675..f4ce46b 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -178,8 +178,8 @@ public class TestStorages {
       if (storeType == StoreType.RCFILE || storeType == StoreType.TREVNI || storeType ==
StoreType.CSV) {
         assertTrue(tuple.get(0) == null);
       }
-      assertEquals(DatumFactory.createInt8(tupleCnt + 2), tuple.getLong(1));
-      assertEquals(DatumFactory.createFloat4(tupleCnt + 3), tuple.getFloat(2));
+      assertTrue(tupleCnt + 2 == tuple.get(1).asInt8());
+      assertTrue(tupleCnt + 3 == tuple.get(2).asFloat4());
       tupleCnt++;
     }
     scanner.close();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5177dcfa/tajo-storage/src/test/java/org/apache/tajo/storage/TestVTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestVTuple.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestVTuple.java
index 05f47a5..9837fd1 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestVTuple.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestVTuple.java
@@ -67,9 +67,9 @@ public class TestVTuple {
 		assertTrue(t1.contains(0));
 		assertTrue(t1.contains(1));
 		
-		assertEquals(t1.getString(0).toString(),"str");
-		assertEquals(t1.getInt(1).asInt4(),2);
-		assertTrue(t1.getFloat(257).asFloat4() == 0.76f);
+		assertEquals(t1.getText(0),"str");
+		assertEquals(t1.get(1).asInt4(),2);
+		assertTrue(t1.get(257).asFloat4() == 0.76f);
 	}
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5177dcfa/tajo-storage/src/test/java/org/apache/tajo/storage/v2/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/v2/TestStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/v2/TestStorages.java
index a3d3133..05be40a 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/v2/TestStorages.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/v2/TestStorages.java
@@ -176,8 +176,8 @@ public class TestStorages {
       if (storeType == StoreType.RCFILE || storeType == StoreType.TREVNI || storeType ==
StoreType.CSV) {
         assertTrue(tuple.get(0) == null || tuple.get(0) instanceof NullDatum);
       }
-      assertEquals(DatumFactory.createInt8(tupleCnt + 2), tuple.getLong(1));
-      assertEquals(DatumFactory.createFloat4(tupleCnt + 3), tuple.getFloat(2));
+      assertTrue(tupleCnt + 2 == tuple.get(1).asInt8());
+      assertTrue(tupleCnt + 3 == tuple.get(2).asFloat4());
       tupleCnt++;
     }
     scanner.close();


Mime
View raw message