tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hj...@apache.org
Subject [01/20] tajo git commit: TAJO-1151: Implement the ByteBuffer-based De/Serializer. (jinho)
Date Tue, 25 Nov 2014 14:45:42 GMT
Repository: tajo
Updated Branches:
  refs/heads/hbase_storage 69373878b -> 87c957e43


TAJO-1151: Implement the ByteBuffer-based De/Serializer. (jinho)

Closes #228


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/c193cfaa
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/c193cfaa
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/c193cfaa

Branch: refs/heads/hbase_storage
Commit: c193cfaa8eb50a83bbbf46624bcccb81c0cc7445
Parents: 55084a8
Author: jhkim <jhkim@apache.org>
Authored: Thu Nov 13 11:12:55 2014 +0900
Committer: jhkim <jhkim@apache.org>
Committed: Thu Nov 13 11:12:55 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 tajo-common/pom.xml                             |   4 +
 .../java/org/apache/tajo/util/NumberUtil.java   | 700 +++++++++++++++++--
 .../tajo/storage/text/DelimitedTextFile.java    |  25 +-
 .../text/TextFieldSerializerDeserializer.java   |  56 +-
 5 files changed, 694 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/c193cfaa/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 7f5ec76..d255192 100644
--- a/CHANGES
+++ b/CHANGES
@@ -75,6 +75,8 @@ Release 0.9.1 - unreleased
 
   SUB TASKS
 
+    TAJO-1151: Implement the ByteBuffer-based De/Serializer. (jinho)
+
     TAJO-1152: RawFile ByteBuffer should be reuse. (jinho)
 
     TAJO-1149: Implement direct read of DelimitedTextFile. (jinho)

http://git-wip-us.apache.org/repos/asf/tajo/blob/c193cfaa/tajo-common/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-common/pom.xml b/tajo-common/pom.xml
index 9bbd5a7..a98060d 100644
--- a/tajo-common/pom.xml
+++ b/tajo-common/pom.xml
@@ -216,6 +216,10 @@
       <artifactId>gson</artifactId>
     </dependency>
     <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-buffer</artifactId>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/tajo/blob/c193cfaa/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java
index 375a2e4..e3dd956 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java
@@ -18,9 +18,15 @@
 
 package org.apache.tajo.util;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.util.internal.PlatformDependent;
+
+import java.io.IOException;
+import java.io.OutputStream;
 import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
+import java.nio.charset.Charset;
 
+// this is an implementation copied from LazyPrimitives in hive
 public class NumberUtil {
 
   public static final double[] powersOf10 = {	/* Table giving binary powers of 10.  Entry
*/
@@ -41,58 +47,35 @@ public class NumberUtil {
                                                */
 
   public static long unsigned32(int n) {
-		return n & 0xFFFFFFFFL;
-	}
-	
-	public static int unsigned16(short n) {
-		return n & 0xFFFF;
-	}
-
-  public static byte[] toAsciiBytes(Number i){
-    return BytesUtils.toASCIIBytes(String.valueOf(i).toCharArray());
+    return n & 0xFFFFFFFFL;
   }
 
-  public static byte[] toAsciiBytes(short i){
-    return BytesUtils.toASCIIBytes(String.valueOf(i).toCharArray());
+  public static int unsigned16(short n) {
+    return n & 0xFFFF;
   }
 
-  public static byte[] toAsciiBytes(int i){
+  public static byte[] toAsciiBytes(Number i) {
     return BytesUtils.toASCIIBytes(String.valueOf(i).toCharArray());
   }
 
-  public static byte[] toAsciiBytes(long i){
+  public static byte[] toAsciiBytes(short i) {
     return BytesUtils.toASCIIBytes(String.valueOf(i).toCharArray());
   }
 
-  public static byte[] toAsciiBytes(float i){
+  public static byte[] toAsciiBytes(int i) {
     return BytesUtils.toASCIIBytes(String.valueOf(i).toCharArray());
   }
 
-  public static byte[] toAsciiBytes(double i){
+  public static byte[] toAsciiBytes(long i) {
     return BytesUtils.toASCIIBytes(String.valueOf(i).toCharArray());
   }
 
-  private static void benchmark(int num){
-    System.out.println("Start benchmark. # of :" + num);
-    long start =  System.currentTimeMillis();
-
-    byte[] bytes;
-    long size = 0;
-    for (int i = 0; i < num; i ++){
-      bytes = String.valueOf(i).getBytes();
-      size += bytes.length;
-    }
-
-    long end =  System.currentTimeMillis();
-    System.out.println("JDK getBytes() \t\t\t\t" + (end - start) + " ms, " + "Total: " +
size / (1024 * 1024) + "MB");
-    size = 0;
+  public static byte[] toAsciiBytes(float i) {
+    return BytesUtils.toASCIIBytes(String.valueOf(i).toCharArray());
+  }
 
-    for (int i = 0; i < num; i ++){
-      bytes = toAsciiBytes(i);
-      size += bytes.length;
-    }
-    System.out.println( "NumberUtil toByte() \t" + (System.currentTimeMillis() - end)
-        + " ms, " + "Total: " + size / (1024 * 1024) + "MB");
+  public static byte[] toAsciiBytes(double i) {
+    return BytesUtils.toASCIIBytes(String.valueOf(i).toCharArray());
   }
 
   /**
@@ -324,7 +307,7 @@ public class NumberUtil {
    * int quantity. The second argument specifies the radix to use when parsing
    * the value.
    *
-   * @param radix  the base to use for conversion.
+   * @param radix the base to use for conversion.
    * @return the value represented by the argument
    * @throws NumberFormatException if the argument could not be parsed as an int quantity.
    */
@@ -411,22 +394,655 @@ public class NumberUtil {
     }
     return result;
   }
-  
+
+  /**
+   * Parses the string argument as if it was a long value and returns the
+   * result. Throws NumberFormatException if the string does not represent a
+   * long quantity.
+   *
+   * @param bytes
+   * @param start
+   * @param length a UTF-8 encoded string representation of a long quantity.
+   * @return long the value represented by the argument
+   * @throws NumberFormatException if the argument could not be parsed as a long quantity.
+   */
+  public static long parseLong(byte[] bytes, int start, int length) {
+    return parseLong(bytes, start, length, 10);
+  }
+
+  /**
+   * Parses the string argument as if it was an long value and returns the
+   * result. Throws NumberFormatException if the string does not represent an
+   * long quantity. The second argument specifies the radix to use when parsing
+   * the value.
+   *
+   * @param bytes
+   * @param start
+   * @param length a UTF-8 encoded string representation of a long quantity.
+   * @param radix  the base to use for conversion.
+   * @return the value represented by the argument
+   * @throws NumberFormatException if the argument could not be parsed as an long quantity.
+   */
+  public static long parseLong(byte[] bytes, int start, int length, int radix) {
+    if (bytes == null) {
+      throw new NumberFormatException("String is null");
+    }
+    if (radix < Character.MIN_RADIX || radix > Character.MAX_RADIX) {
+      throw new NumberFormatException("Invalid radix: " + radix);
+    }
+    if (length == 0) {
+      throw new NumberFormatException("Empty string!");
+    }
+    int offset = start;
+    boolean negative = bytes[start] == '-';
+    if (negative || bytes[start] == '+') {
+      offset++;
+      if (length == 1) {
+        throw new NumberFormatException(new String(bytes, start,
+            length));
+      }
+    }
+
+    return parseLongInternal(bytes, start, length, offset, radix, negative);
+  }
+
+  /**
+   * /** Parses the string argument as if it was an long value and returns the
+   * result. Throws NumberFormatException if the string does not represent an
+   * long quantity. The second argument specifies the radix to use when parsing
+   * the value.
+   *
+   * @param bytes
+   * @param start
+   * @param length   a UTF-8 encoded string representation of a long quantity.
+   * @param offset   the starting position after the sign (if exists)
+   * @param radix    the base to use for conversion.
+   * @param negative whether the number is negative.
+   * @return the value represented by the argument
+   * @throws NumberFormatException if the argument could not be parsed as an long quantity.
+   */
+  private static long parseLongInternal(byte[] bytes, int start, int length, int offset,
+                                        int radix, boolean negative) {
+    byte separator = '.';
+    long max = Long.MIN_VALUE / radix;
+    long result = 0, end = start + length;
+    while (offset < end) {
+      int digit = digit(bytes[offset++], radix);
+      if (digit == -1 || max > result) {
+        if (bytes[offset - 1] == separator) {
+          // We allow decimals and will return a truncated integer in that case.
+          // Therefore we won't throw an exception here (checking the fractional
+          // part happens below.)
+          break;
+        }
+        throw new NumberFormatException(new String(bytes, start,
+            length));
+      }
+      long next = result * radix - digit;
+      if (next > result) {
+        throw new NumberFormatException(new String(bytes, start,
+            length));
+      }
+      result = next;
+    }
+
+    // This is the case when we've encountered a decimal separator. The fractional
+    // part will not change the number, but we will verify that the fractional part
+    // is well formed.
+    while (offset < end) {
+      int digit = digit(bytes[offset++], radix);
+      if (digit == -1) {
+        throw new NumberFormatException(new String(bytes, start,
+            length));
+      }
+    }
+
+    if (!negative) {
+      result = -result;
+      if (result < 0) {
+        throw new NumberFormatException(new String(bytes, start,
+            length));
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Writes out the text representation of an integer using base 10 to an
+   * OutputStream in UTF-8 encoding.
+   * <p/>
+   * Note: division by a constant (like 10) is much faster than division by a
+   * variable. That's one of the reasons that we don't make radix a parameter
+   * here.
+   *
+   * @param out the outputstream to write to
+   * @param i   an int to write out
+   * @throws IOException
+   */
+  public static void writeUTF8(OutputStream out, int i) throws IOException {
+    if (i == 0) {
+      out.write('0');
+      return;
+    }
+
+    boolean negative = i < 0;
+    if (negative) {
+      out.write('-');
+    } else {
+      // negative range is bigger than positive range, so there is no risk
+      // of overflow here.
+      i = -i;
+    }
+
+    int start = 1000000000;
+    while (i / start == 0) {
+      start /= 10;
+    }
+
+    while (start > 0) {
+      out.write('0' - (i / start % 10));
+      start /= 10;
+    }
+  }
+
+  /**
+   * Writes out the text representation of an integer using base 10 to an
+   * OutputStream in UTF-8 encoding.
+   * <p/>
+   * Note: division by a constant (like 10) is much faster than division by a
+   * variable. That's one of the reasons that we don't make radix a parameter
+   * here.
+   *
+   * @param out the outputstream to write to
+   * @param i   an int to write out
+   * @throws java.io.IOException
+   */
+  public static void writeUTF8(OutputStream out, long i) throws IOException {
+    if (i == 0) {
+      out.write('0');
+      return;
+    }
+
+    boolean negative = i < 0;
+    if (negative) {
+      out.write('-');
+    } else {
+      // negative range is bigger than positive range, so there is no risk
+      // of overflow here.
+      i = -i;
+    }
+
+    long start = 1000000000000000000L;
+    while (i / start == 0) {
+      start /= 10;
+    }
+
+    while (start > 0) {
+      out.write('0' - (int) ((i / start) % 10));
+      start /= 10;
+    }
+  }
+
+  /**
+   * Parses the byte array argument as if it was a double value and returns the
+   * result. Throws NumberFormatException if the byte buffer does not represent a
+   * double value.
+   *
+   * @return double, the value represented by the argument
+   * @throws NumberFormatException if the argument could not be parsed as a double
+   */
+  public static double parseDouble(ByteBuf bytes) {
+    return parseDouble(bytes, bytes.readerIndex(), bytes.readableBytes());
+  }
+
+  /**
+   * Parses the byte array argument as if it was a double value and returns the
+   * result. Throws NumberFormatException if the byte buffer does not represent a
+   * double value.
+   *
+   * @return double, the value represented by the argument
+   * @throws NumberFormatException if the argument could not be parsed as a double
+   */
+  public static double parseDouble(ByteBuf bytes, int start, int length) {
+    if (!PlatformDependent.hasUnsafe()) {
+      return parseDouble(bytes.array(), start, length);
+    }
+
+    if (bytes == null) {
+      throw new NumberFormatException("String is null");
+    }
+
+    if (length == 0 || bytes.writerIndex() < start + length) {
+      throw new NumberFormatException("Empty string or Invalid buffer!");
+    }
+
+
+    long memoryAddress = bytes.memoryAddress();
+    /*
+     * Strip off leading blanks
+     */
+    int offset = start;
+    int end = start + length;
+
+    while (offset < end && PlatformDependent.getByte(memoryAddress + offset) ==
' ') {
+      offset++;
+    }
+    if (offset == end) {
+      throw new NumberFormatException("blank byte array!");
+    }
+
+    /*
+     * check for a sign.
+     */
+    boolean sign = false;
+    if (PlatformDependent.getByte(memoryAddress + offset) == '-') {
+      sign = true;
+      offset++;
+    } else if (PlatformDependent.getByte(memoryAddress + offset) == '+') {
+      offset++;
+    }
+    if (offset == end) {
+      throw new NumberFormatException("the byte array only has a sign!");
+    }
+
+    /*
+     * Count the number of digits in the mantissa (including the decimal
+     * point), and also locate the decimal point.
+     */
+    int mantSize = 0;		      /* Number of digits in mantissa. */
+    int decicalOffset = -1;   /* Number of mantissa digits BEFORE decimal point. */
+    for (; offset < end; offset++) {
+      if (!isDigit(PlatformDependent.getByte(memoryAddress + offset))) {
+        if ((PlatformDependent.getByte(memoryAddress + offset) != '.') || (decicalOffset
>= 0)) {
+          break;
+        }
+        decicalOffset = mantSize;
+      }
+      mantSize++;
+    }
+
+    int exponentOffset = offset; /* Temporarily holds location of exponent in bytes. */
+
+    /*
+     * Now suck up the digits in the mantissa.  Use two integers to
+     * collect 9 digits each (this is faster than using floating-point).
+     * If the mantissa has more than 18 digits, ignore the extras, since
+     * they can't affect the value anyway.
+     */
+    offset -= mantSize;
+    if (decicalOffset < 0) {
+      decicalOffset = mantSize;
+    } else {
+      mantSize -= 1;			       /* One of the digits was the decimal point. */
+    }
+    int fracExponent;            /* Exponent that derives from the fractional
+                                  * part.  Under normal circumstatnces, it is
+				                          * the negative of the number of digits in F.
+				                          * However, if I is very long, the last digits
+				                          * of I get dropped (otherwise a long I with a
+				                          * large negative exponent could cause an
+				                          * unnecessary overflow on I alone).  In this
+				                          * case, fracExp is incremented one for each
+				                          * dropped digit. */
+    if (mantSize > 18) {
+      fracExponent = decicalOffset - 18;
+      mantSize = 18;
+    } else {
+      fracExponent = decicalOffset - mantSize;
+    }
+
+    if (mantSize == 0) {
+      return 0.0;
+    }
+
+    int frac1 = 0;
+    for (; mantSize > 9; mantSize--) {
+      int b = PlatformDependent.getByte(memoryAddress + offset);
+      offset++;
+      if (b == '.') {
+        b = PlatformDependent.getByte(memoryAddress + offset);
+        offset++;
+      }
+      frac1 = 10 * frac1 + (b - '0');
+    }
+    int frac2 = 0;
+    for (; mantSize > 0; mantSize--) {
+      int b = PlatformDependent.getByte(memoryAddress + offset);
+      offset++;
+      if (b == '.') {
+        b = PlatformDependent.getByte(memoryAddress + offset);
+        offset++;
+      }
+      frac2 = 10 * frac2 + (b - '0');
+    }
+    double fraction = (1.0e9 * frac1) + frac2;
+
+    /*
+     * Skim off the exponent.
+     */
+    int exponent = 0;            /* Exponent read from "EX" field. */
+    offset = exponentOffset;
+    boolean expSign = false;
+
+    if (offset < end) {
+      if ((PlatformDependent.getByte(memoryAddress + offset) != 'E')
+          && (PlatformDependent.getByte(memoryAddress + offset) != 'e')) {
+        throw new NumberFormatException(bytes.toString(start, length, Charset.defaultCharset()));
+      }
+
+      // (bytes[offset] == 'E') || (bytes[offset] == 'e')
+      offset++;
+
+      if (PlatformDependent.getByte(memoryAddress + offset) == '-') {
+        expSign = true;
+        offset++;
+      } else if (PlatformDependent.getByte(memoryAddress + offset) == '+') {
+        offset++;
+      }
+
+      for (; offset < end; offset++) {
+        if (isDigit(PlatformDependent.getByte(memoryAddress + offset))) {
+          exponent = exponent * 10 + (PlatformDependent.getByte(memoryAddress + offset) -
'0');
+        } else {
+          throw new NumberFormatException(bytes.toString(start, length, Charset.defaultCharset()));
+        }
+      }
+    }
+
+    exponent = expSign ? (fracExponent - exponent) : (fracExponent + exponent);
+
+    /*
+     * Generate a floating-point number that represents the exponent.
+     * Do this by processing the exponent one bit at a time to combine
+     * many powers of 2 of 10. Then combine the exponent with the
+     * fraction.
+     */
+    if (exponent < 0) {
+      expSign = true;
+      exponent = -exponent;
+    } else {
+      expSign = false;
+    }
+    if (exponent > maxExponent) {
+      throw new NumberFormatException(bytes.toString(start, length, Charset.defaultCharset()));
+    }
+
+    double dblExp = 1.0;
+    for (int i = 0; exponent != 0; exponent >>= 1, i++) {
+      if ((exponent & 01) == 01) {
+        dblExp *= powersOf10[i];
+      }
+    }
+
+    fraction = (expSign) ? (fraction / dblExp) : (fraction * dblExp);
+
+    return sign ? (-fraction) : fraction;
+  }
+
+
+  /**
+   * Parses the byte buffer argument as if it was an int value and returns the
+   * result. Throws NumberFormatException if the byte array does not represent an
+   * int quantity.
+   *
+   * @return int the value represented by the argument
+   * @throws NumberFormatException if the argument could not be parsed as an int quantity.
+   */
+  public static int parseInt(ByteBuf bytes) {
+    return parseInt(bytes, bytes.readerIndex(), bytes.readableBytes());
+  }
+
+  /**
+   * Parses the byte buffer argument as if it was an int value and returns the
+   * result. Throws NumberFormatException if the byte array does not represent an
+   * int quantity.
+   *
+   * @return int the value represented by the argument
+   * @throws NumberFormatException if the argument could not be parsed as an int quantity.
+   */
+  public static int parseInt(ByteBuf bytes, int start, int length) {
+    return parseInt(bytes, start, length, 10);
+  }
+
+  /**
+   * Parses the byte buffer argument as if it was an int value and returns the
+   * result. Throws NumberFormatException if the byte array does not represent an
+   * int quantity. The second argument specifies the radix to use when parsing
+   * the value.
+   *
+   * @param radix the base to use for conversion.
+   * @return the value represented by the argument
+   * @throws NumberFormatException if the argument could not be parsed as an int quantity.
+   */
+  public static int parseInt(ByteBuf bytes, int start, int length, int radix) {
+    if (!PlatformDependent.hasUnsafe()) {
+      return parseInt(bytes.array(), start, length);
+    }
+
+    if (bytes == null) {
+      throw new NumberFormatException("String is null");
+    }
+    if (radix < Character.MIN_RADIX || radix > Character.MAX_RADIX) {
+      throw new NumberFormatException("Invalid radix: " + radix);
+    }
+    if (length == 0 || bytes.writerIndex() < start + length) {
+      throw new NumberFormatException("Empty string or Invalid buffer!");
+    }
+
+    long memoryAddress = bytes.memoryAddress();
+
+    int offset = start;
+    boolean negative = PlatformDependent.getByte(memoryAddress + start) == '-';
+    if (negative || PlatformDependent.getByte(memoryAddress + start) == '+') {
+      offset++;
+      if (length == 1) {
+        throw new NumberFormatException(bytes.toString(start, length, Charset.defaultCharset()));
+      }
+    }
+
+    return parseIntInternal(bytes, memoryAddress, start, length, offset, radix, negative);
+  }
+
+  /**
+   * @param bytes         the string byte buffer
+   * @param memoryAddress the offheap memory address
+   * @param start
+   * @param length
+   * @param radix         the base to use for conversion.
+   * @param offset        the starting position after the sign (if exists)
+   * @param radix         the base to use for conversion.
+   * @param negative      whether the number is negative.
+   * @return the value represented by the argument
+   * @throws NumberFormatException if the argument could not be parsed as an int quantity.
+   */
+  private static int parseIntInternal(ByteBuf bytes, long memoryAddress, int start, int length,
int offset,
+                                      int radix, boolean negative) {
+    byte separator = '.';
+    int max = Integer.MIN_VALUE / radix;
+    int result = 0, end = start + length;
+    while (offset < end) {
+      int digit = digit(PlatformDependent.getByte(memoryAddress + offset++), radix);
+      if (digit == -1) {
+        if (PlatformDependent.getByte(memoryAddress + offset - 1) == separator) {
+          // We allow decimals and will return a truncated integer in that case.
+          // Therefore we won't throw an exception here (checking the fractional
+          // part happens below.)
+          break;
+        }
+        throw new NumberFormatException(bytes.toString(start, length, Charset.defaultCharset()));
+      }
+      if (max > result) {
+        throw new NumberFormatException(bytes.toString(start, length, Charset.defaultCharset()));
+      }
+      int next = result * radix - digit;
+      if (next > result) {
+        throw new NumberFormatException(bytes.toString(start, length, Charset.defaultCharset()));
+      }
+      result = next;
+    }
+
+    // This is the case when we've encountered a decimal separator. The fractional
+    // part will not change the number, but we will verify that the fractional part
+    // is well formed.
+    while (offset < end) {
+      int digit = digit(PlatformDependent.getByte(memoryAddress + offset++), radix);
+      if (digit == -1) {
+        throw new NumberFormatException(bytes.toString(start, length, Charset.defaultCharset()));
+      }
+    }
+
+    if (!negative) {
+      result = -result;
+      if (result < 0) {
+        throw new NumberFormatException(bytes.toString(start, length, Charset.defaultCharset()));
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Parses the byte buffer argument as if it was a long value and returns the
+   * result. Throws NumberFormatException if the string does not represent a
+   * long quantity.
+   *
+   * @param bytes the string byte buffer
+   * @return long the value represented by the argument
+   * @throws NumberFormatException if the argument could not be parsed as a long quantity.
+   */
+  public static long parseLong(ByteBuf bytes) {
+    return parseLong(bytes, bytes.readerIndex(), bytes.readableBytes());
+  }
+
+  /**
+   * Parses the byte buffer argument as if it was a long value and returns the
+   * result. Throws NumberFormatException if the string does not represent a
+   * long quantity.
+   *
+   * @param bytes  the string byte buffer
+   * @param start
+   * @param length a UTF-8 encoded string representation of a long quantity.
+   * @return long the value represented by the argument
+   * @throws NumberFormatException if the argument could not be parsed as a long quantity.
+   */
+  public static long parseLong(ByteBuf bytes, int start, int length) {
+    return parseLong(bytes, start, length, 10);
+  }
+
+  /**
+   * Parses the byte buffer argument as if it was an long value and returns the
+   * result. Throws NumberFormatException if the string does not represent an
+   * long quantity. The second argument specifies the radix to use when parsing
+   * the value.
+   *
+   * @param bytes  the string byte buffer
+   * @param start
+   * @param length a UTF-8 encoded string representation of a long quantity.
+   * @param radix  the base to use for conversion.
+   * @return the value represented by the argument
+   * @throws NumberFormatException if the argument could not be parsed as an long quantity.
+   */
+  public static long parseLong(ByteBuf bytes, int start, int length, int radix) {
+    if (!PlatformDependent.hasUnsafe()) {
+      return parseInt(bytes.array(), start, length);
+    }
+
+    if (bytes == null) {
+      throw new NumberFormatException("String is null");
+    }
+    if (radix < Character.MIN_RADIX || radix > Character.MAX_RADIX) {
+      throw new NumberFormatException("Invalid radix: " + radix);
+    }
+    if (length == 0 || bytes.writerIndex() < start + length) {
+      throw new NumberFormatException("Empty string or Invalid buffer!");
+    }
+
+    long memoryAddress = bytes.memoryAddress();
+
+    int offset = start;
+    boolean negative = PlatformDependent.getByte(memoryAddress + start) == '-';
+    if (negative || PlatformDependent.getByte(memoryAddress + start) == '+') {
+      offset++;
+      if (length == 1) {
+        throw new NumberFormatException(bytes.toString(start, length, Charset.defaultCharset()));
+      }
+    }
+
+    return parseLongInternal(bytes, memoryAddress, start, length, offset, radix, negative);
+  }
+
+  /**
+   * /** Parses the byte buffer argument as if it was an long value and returns the
+   * result. Throws NumberFormatException if the string does not represent an
+   * long quantity. The second argument specifies the radix to use when parsing
+   * the value.
+   *
+   * @param bytes         the string byte buffer
+   * @param memoryAddress the offheap memory address
+   * @param start
+   * @param length        a UTF-8 encoded string representation of a long quantity.
+   * @param offset        the starting position after the sign (if exists)
+   * @param radix         the base to use for conversion.
+   * @param negative      whether the number is negative.
+   * @return the value represented by the argument
+   * @throws NumberFormatException if the argument could not be parsed as an long quantity.
+   */
+  private static long parseLongInternal(ByteBuf bytes, long memoryAddress, int start, int
length, int offset,
+                                        int radix, boolean negative) {
+    byte separator = '.';
+    long max = Long.MIN_VALUE / radix;
+    long result = 0, end = start + length;
+    while (offset < end) {
+      int digit = digit(PlatformDependent.getByte(memoryAddress + offset++), radix);
+      if (digit == -1 || max > result) {
+        if (PlatformDependent.getByte(memoryAddress + offset - 1) == separator) {
+          // We allow decimals and will return a truncated integer in that case.
+          // Therefore we won't throw an exception here (checking the fractional
+          // part happens below.)
+          break;
+        }
+        throw new NumberFormatException(bytes.toString(start, length, Charset.defaultCharset()));
+      }
+      long next = result * radix - digit;
+      if (next > result) {
+        throw new NumberFormatException(bytes.toString(start, length, Charset.defaultCharset()));
+      }
+      result = next;
+    }
+
+    // This is the case when we've encountered a decimal separator. The fractional
+    // part will not change the number, but we will verify that the fractional part
+    // is well formed.
+    while (offset < end) {
+      int digit = digit(PlatformDependent.getByte(memoryAddress + offset++), radix);
+      if (digit == -1) {
+        throw new NumberFormatException(bytes.toString(start, length, Charset.defaultCharset()));
+      }
+    }
+
+    if (!negative) {
+      result = -result;
+      if (result < 0) {
+        throw new NumberFormatException(bytes.toString(start, length, Charset.defaultCharset()));
+      }
+    }
+    return result;
+  }
+
   public static Number numberValue(Class<?> numberClazz, String value) {
     Number returnNumber = null;
-    
+
     if (numberClazz == null && value == null) {
       return returnNumber;
     }
-    
+
     if (Number.class.isAssignableFrom(numberClazz)) {
       try {
         Constructor<?> constructor = numberClazz.getConstructor(String.class);
         returnNumber = (Number) constructor.newInstance(value);
-      } catch (Exception ignored) {   
+      } catch (Exception ignored) {
       }
     }
-    
+
     return returnNumber;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/c193cfaa/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
index dbf8435..68d89e7 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.io.compress.CompressionOutputStream;
 import org.apache.hadoop.io.compress.Compressor;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.statistics.TableStats;
@@ -134,15 +133,7 @@ public class DelimitedTextFile {
         this.stats = new TableStatistics(this.schema);
       }
 
-      try {
-        // we need to discuss the De/Serializer interface. so custom serde is to disable
-        String serdeClass = this.meta.getOption(StorageConstants.TEXTFILE_SERDE,
-            TextFieldSerializerDeserializer.class.getName());
-        serde = (TextFieldSerializerDeserializer) ReflectionUtils.newInstance(Class.forName(serdeClass),
conf);
-      } catch (Throwable e) {
-        LOG.error(e.getMessage(), e);
-        throw new IOException(e);
-      }
+      serde = new TextFieldSerializerDeserializer();
 
       if (os == null) {
         os = new NonSyncByteArrayOutputStream(BUFFER_SIZE);
@@ -314,15 +305,7 @@ public class DelimitedTextFile {
         targetColumnIndexes[i] = schema.getColumnId(targets[i].getQualifiedName());
       }
 
-      try {
-        // we need to discuss the De/Serializer interface. so custom serde is to disable
-        String serdeClass = this.meta.getOption(StorageConstants.TEXTFILE_SERDE,
-            TextFieldSerializerDeserializer.class.getName());
-        serde = (TextFieldSerializerDeserializer) ReflectionUtils.newInstance(Class.forName(serdeClass),
conf);
-      } catch (Throwable e) {
-        LOG.error(e.getMessage(), e);
-        throw new IOException(e);
-      }
+      serde = new TextFieldSerializerDeserializer();
 
       super.init();
       Arrays.sort(targetColumnIndexes);
@@ -410,8 +393,8 @@ public class DelimitedTextFile {
         }
 
         if (projection.length > currentTarget && currentIndex == projection[currentTarget])
{
-          Datum datum = serde.deserialize(lineBuf.slice(start, fieldLength),
-              schema.getColumn(currentIndex), currentIndex, nullChars);
+          lineBuf.setIndex(start, start + fieldLength);
+          Datum datum = serde.deserialize(lineBuf, schema.getColumn(currentIndex), currentIndex,
nullChars);
           dst.put(currentIndex, datum);
           currentTarget++;
         }

http://git-wip-us.apache.org/repos/asf/tajo/blob/c193cfaa/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java
b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java
index 0057b54..9722959 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java
@@ -32,12 +32,14 @@ import org.apache.tajo.util.NumberUtil;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.nio.charset.CharsetDecoder;
 
 //Compatibility with Apache Hive
 public class TextFieldSerializerDeserializer implements FieldSerializerDeserializer {
   public static final byte[] trueBytes = "true".getBytes();
   public static final byte[] falseBytes = "false".getBytes();
   private ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance();
+  private final CharsetDecoder decoder = CharsetUtil.getDecoder(CharsetUtil.UTF_8);
 
   private static boolean isNull(ByteBuf val, ByteBuf nullBytes) {
     return !val.isReadable() || nullBytes.equals(val);
@@ -122,7 +124,7 @@ public class TextFieldSerializerDeserializer implements FieldSerializerDeseriali
   }
 
   @Override
-  public Datum deserialize(ByteBuf buf, Column col, int columnIndex, ByteBuf nullChars) throws
IOException{
+  public Datum deserialize(ByteBuf buf, Column col, int columnIndex, ByteBuf nullChars) throws
IOException {
     Datum datum;
     TajoDataTypes.Type type = col.getDataType().getType();
     boolean nullField;
@@ -141,41 +143,30 @@ public class TextFieldSerializerDeserializer implements FieldSerializerDeseriali
           datum = DatumFactory.createBool(bool == 't' || bool == 'T');
           break;
         case BIT:
-          datum = DatumFactory.createBit(Byte.parseByte(buf.toString(CharsetUtil.UTF_8)));
+          datum = DatumFactory.createBit(Byte.parseByte(
+              decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()));
           break;
         case CHAR:
-          datum = DatumFactory.createChar(buf.toString(CharsetUtil.UTF_8).trim());
+          datum = DatumFactory.createChar(
+              decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString().trim());
           break;
         case INT1:
-        case INT2: {
-          //TODO zero-copy
-          byte[] bytes = new byte[buf.readableBytes()];
-          buf.readBytes(bytes);
-          datum = DatumFactory.createInt2((short) NumberUtil.parseInt(bytes, 0, bytes.length));
+        case INT2:
+          datum = DatumFactory.createInt2((short) NumberUtil.parseInt(buf));
           break;
-        }
-        case INT4: {
-          //TODO zero-copy
-          byte[] bytes = new byte[buf.readableBytes()];
-          buf.readBytes(bytes);
-          datum = DatumFactory.createInt4(NumberUtil.parseInt(bytes, 0, bytes.length));
+        case INT4:
+          datum = DatumFactory.createInt4(NumberUtil.parseInt(buf));
           break;
-        }
         case INT8:
-          //TODO zero-copy
-          datum = DatumFactory.createInt8(buf.toString(CharsetUtil.UTF_8));
+          datum = DatumFactory.createInt8(NumberUtil.parseLong(buf));
           break;
         case FLOAT4:
-          //TODO zero-copy
-          datum = DatumFactory.createFloat4(buf.toString(CharsetUtil.UTF_8));
+          datum = DatumFactory.createFloat4(
+              decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString());
           break;
-        case FLOAT8: {
-          //TODO zero-copy
-          byte[] bytes = new byte[buf.readableBytes()];
-          buf.readBytes(bytes);
-          datum = DatumFactory.createFloat8(NumberUtil.parseDouble(bytes, 0, bytes.length));
+        case FLOAT8:
+          datum = DatumFactory.createFloat8(NumberUtil.parseDouble(buf));
           break;
-        }
         case TEXT: {
           byte[] bytes = new byte[buf.readableBytes()];
           buf.readBytes(bytes);
@@ -183,16 +174,20 @@ public class TextFieldSerializerDeserializer implements FieldSerializerDeseriali
           break;
         }
         case DATE:
-          datum = DatumFactory.createDate(buf.toString(CharsetUtil.UTF_8));
+          datum = DatumFactory.createDate(
+              decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString());
           break;
         case TIME:
-          datum = DatumFactory.createTime(buf.toString(CharsetUtil.UTF_8));
+          datum = DatumFactory.createTime(
+              decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString());
           break;
         case TIMESTAMP:
-          datum = DatumFactory.createTimestamp(buf.toString(CharsetUtil.UTF_8));
+          datum = DatumFactory.createTimestamp(
+              decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString());
           break;
         case INTERVAL:
-          datum = DatumFactory.createInterval(buf.toString(CharsetUtil.UTF_8));
+          datum = DatumFactory.createInterval(
+              decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString());
           break;
         case PROTOBUF: {
           ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType());
@@ -209,7 +204,8 @@ public class TextFieldSerializerDeserializer implements FieldSerializerDeseriali
           break;
         }
         case INET4:
-          datum = DatumFactory.createInet4(buf.toString(CharsetUtil.UTF_8));
+          datum = DatumFactory.createInet4(
+              decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString());
           break;
         case BLOB: {
           byte[] bytes = new byte[buf.readableBytes()];


Mime
View raw message