Return-Path: X-Original-To: apmail-tajo-commits-archive@minotaur.apache.org Delivered-To: apmail-tajo-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DA3FA10C76 for ; Fri, 14 Nov 2014 04:56:00 +0000 (UTC) Received: (qmail 21767 invoked by uid 500); 14 Nov 2014 04:56:00 -0000 Delivered-To: apmail-tajo-commits-archive@tajo.apache.org Received: (qmail 21673 invoked by uid 500); 14 Nov 2014 04:56:00 -0000 Mailing-List: contact commits-help@tajo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tajo.apache.org Delivered-To: mailing list commits@tajo.apache.org Received: (qmail 21533 invoked by uid 99); 14 Nov 2014 04:56:00 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 14 Nov 2014 04:56:00 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 75FBB93AC47; Fri, 14 Nov 2014 04:56:00 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jihoonson@apache.org To: commits@tajo.apache.org Date: Fri, 14 Nov 2014 04:56:03 -0000 Message-Id: <9867074da72649e1b1be8998906b670c@git.apache.org> In-Reply-To: <6e08d6ab1d7340598719dd9ed9252969@git.apache.org> References: <6e08d6ab1d7340598719dd9ed9252969@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/7] tajo git commit: TAJO-1151: Implement the ByteBuffer-based De/Serializer. (jinho) TAJO-1151: Implement the ByteBuffer-based De/Serializer. (jinho) Closes #228 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/c193cfaa Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/c193cfaa Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/c193cfaa Branch: refs/heads/index_support Commit: c193cfaa8eb50a83bbbf46624bcccb81c0cc7445 Parents: 55084a8 Author: jhkim Authored: Thu Nov 13 11:12:55 2014 +0900 Committer: jhkim Committed: Thu Nov 13 11:12:55 2014 +0900 ---------------------------------------------------------------------- CHANGES | 2 + tajo-common/pom.xml | 4 + .../java/org/apache/tajo/util/NumberUtil.java | 700 +++++++++++++++++-- .../tajo/storage/text/DelimitedTextFile.java | 25 +- .../text/TextFieldSerializerDeserializer.java | 56 +- 5 files changed, 694 insertions(+), 93 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/c193cfaa/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 7f5ec76..d255192 100644 --- a/CHANGES +++ b/CHANGES @@ -75,6 +75,8 @@ Release 0.9.1 - unreleased SUB TASKS + TAJO-1151: Implement the ByteBuffer-based De/Serializer. (jinho) + TAJO-1152: RawFile ByteBuffer should be reuse. (jinho) TAJO-1149: Implement direct read of DelimitedTextFile. (jinho) http://git-wip-us.apache.org/repos/asf/tajo/blob/c193cfaa/tajo-common/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-common/pom.xml b/tajo-common/pom.xml index 9bbd5a7..a98060d 100644 --- a/tajo-common/pom.xml +++ b/tajo-common/pom.xml @@ -216,6 +216,10 @@ gson + io.netty + netty-buffer + + junit junit test http://git-wip-us.apache.org/repos/asf/tajo/blob/c193cfaa/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java index 375a2e4..e3dd956 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java @@ -18,9 +18,15 @@ package org.apache.tajo.util; +import io.netty.buffer.ByteBuf; +import io.netty.util.internal.PlatformDependent; + +import java.io.IOException; +import java.io.OutputStream; import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; +import java.nio.charset.Charset; +// this is an implementation copied from LazyPrimitives in hive public class NumberUtil { public static final double[] powersOf10 = { /* Table giving binary powers of 10. Entry */ @@ -41,58 +47,35 @@ public class NumberUtil { */ public static long unsigned32(int n) { - return n & 0xFFFFFFFFL; - } - - public static int unsigned16(short n) { - return n & 0xFFFF; - } - - public static byte[] toAsciiBytes(Number i){ - return BytesUtils.toASCIIBytes(String.valueOf(i).toCharArray()); + return n & 0xFFFFFFFFL; } - public static byte[] toAsciiBytes(short i){ - return BytesUtils.toASCIIBytes(String.valueOf(i).toCharArray()); + public static int unsigned16(short n) { + return n & 0xFFFF; } - public static byte[] toAsciiBytes(int i){ + public static byte[] toAsciiBytes(Number i) { return BytesUtils.toASCIIBytes(String.valueOf(i).toCharArray()); } - public static byte[] toAsciiBytes(long i){ + public static byte[] toAsciiBytes(short i) { return BytesUtils.toASCIIBytes(String.valueOf(i).toCharArray()); } - public static byte[] toAsciiBytes(float i){ + public static byte[] toAsciiBytes(int i) { return BytesUtils.toASCIIBytes(String.valueOf(i).toCharArray()); } - public static byte[] toAsciiBytes(double i){ + public static byte[] toAsciiBytes(long i) { return BytesUtils.toASCIIBytes(String.valueOf(i).toCharArray()); } - private static void benchmark(int num){ - System.out.println("Start benchmark. # of :" + num); - long start = System.currentTimeMillis(); - - byte[] bytes; - long size = 0; - for (int i = 0; i < num; i ++){ - bytes = String.valueOf(i).getBytes(); - size += bytes.length; - } - - long end = System.currentTimeMillis(); - System.out.println("JDK getBytes() \t\t\t\t" + (end - start) + " ms, " + "Total: " + size / (1024 * 1024) + "MB"); - size = 0; + public static byte[] toAsciiBytes(float i) { + return BytesUtils.toASCIIBytes(String.valueOf(i).toCharArray()); + } - for (int i = 0; i < num; i ++){ - bytes = toAsciiBytes(i); - size += bytes.length; - } - System.out.println( "NumberUtil toByte() \t" + (System.currentTimeMillis() - end) - + " ms, " + "Total: " + size / (1024 * 1024) + "MB"); + public static byte[] toAsciiBytes(double i) { + return BytesUtils.toASCIIBytes(String.valueOf(i).toCharArray()); } /** @@ -324,7 +307,7 @@ public class NumberUtil { * int quantity. The second argument specifies the radix to use when parsing * the value. * - * @param radix the base to use for conversion. + * @param radix the base to use for conversion. * @return the value represented by the argument * @throws NumberFormatException if the argument could not be parsed as an int quantity. */ @@ -411,22 +394,655 @@ public class NumberUtil { } return result; } - + + /** + * Parses the string argument as if it was a long value and returns the + * result. Throws NumberFormatException if the string does not represent a + * long quantity. + * + * @param bytes + * @param start + * @param length a UTF-8 encoded string representation of a long quantity. + * @return long the value represented by the argument + * @throws NumberFormatException if the argument could not be parsed as a long quantity. + */ + public static long parseLong(byte[] bytes, int start, int length) { + return parseLong(bytes, start, length, 10); + } + + /** + * Parses the string argument as if it was an long value and returns the + * result. Throws NumberFormatException if the string does not represent an + * long quantity. The second argument specifies the radix to use when parsing + * the value. + * + * @param bytes + * @param start + * @param length a UTF-8 encoded string representation of a long quantity. + * @param radix the base to use for conversion. + * @return the value represented by the argument + * @throws NumberFormatException if the argument could not be parsed as an long quantity. + */ + public static long parseLong(byte[] bytes, int start, int length, int radix) { + if (bytes == null) { + throw new NumberFormatException("String is null"); + } + if (radix < Character.MIN_RADIX || radix > Character.MAX_RADIX) { + throw new NumberFormatException("Invalid radix: " + radix); + } + if (length == 0) { + throw new NumberFormatException("Empty string!"); + } + int offset = start; + boolean negative = bytes[start] == '-'; + if (negative || bytes[start] == '+') { + offset++; + if (length == 1) { + throw new NumberFormatException(new String(bytes, start, + length)); + } + } + + return parseLongInternal(bytes, start, length, offset, radix, negative); + } + + /** + * /** Parses the string argument as if it was an long value and returns the + * result. Throws NumberFormatException if the string does not represent an + * long quantity. The second argument specifies the radix to use when parsing + * the value. + * + * @param bytes + * @param start + * @param length a UTF-8 encoded string representation of a long quantity. + * @param offset the starting position after the sign (if exists) + * @param radix the base to use for conversion. + * @param negative whether the number is negative. + * @return the value represented by the argument + * @throws NumberFormatException if the argument could not be parsed as an long quantity. + */ + private static long parseLongInternal(byte[] bytes, int start, int length, int offset, + int radix, boolean negative) { + byte separator = '.'; + long max = Long.MIN_VALUE / radix; + long result = 0, end = start + length; + while (offset < end) { + int digit = digit(bytes[offset++], radix); + if (digit == -1 || max > result) { + if (bytes[offset - 1] == separator) { + // We allow decimals and will return a truncated integer in that case. + // Therefore we won't throw an exception here (checking the fractional + // part happens below.) + break; + } + throw new NumberFormatException(new String(bytes, start, + length)); + } + long next = result * radix - digit; + if (next > result) { + throw new NumberFormatException(new String(bytes, start, + length)); + } + result = next; + } + + // This is the case when we've encountered a decimal separator. The fractional + // part will not change the number, but we will verify that the fractional part + // is well formed. + while (offset < end) { + int digit = digit(bytes[offset++], radix); + if (digit == -1) { + throw new NumberFormatException(new String(bytes, start, + length)); + } + } + + if (!negative) { + result = -result; + if (result < 0) { + throw new NumberFormatException(new String(bytes, start, + length)); + } + } + return result; + } + + /** + * Writes out the text representation of an integer using base 10 to an + * OutputStream in UTF-8 encoding. + *

+ * Note: division by a constant (like 10) is much faster than division by a + * variable. That's one of the reasons that we don't make radix a parameter + * here. + * + * @param out the outputstream to write to + * @param i an int to write out + * @throws IOException + */ + public static void writeUTF8(OutputStream out, int i) throws IOException { + if (i == 0) { + out.write('0'); + return; + } + + boolean negative = i < 0; + if (negative) { + out.write('-'); + } else { + // negative range is bigger than positive range, so there is no risk + // of overflow here. + i = -i; + } + + int start = 1000000000; + while (i / start == 0) { + start /= 10; + } + + while (start > 0) { + out.write('0' - (i / start % 10)); + start /= 10; + } + } + + /** + * Writes out the text representation of an integer using base 10 to an + * OutputStream in UTF-8 encoding. + *

+ * Note: division by a constant (like 10) is much faster than division by a + * variable. That's one of the reasons that we don't make radix a parameter + * here. + * + * @param out the outputstream to write to + * @param i an int to write out + * @throws java.io.IOException + */ + public static void writeUTF8(OutputStream out, long i) throws IOException { + if (i == 0) { + out.write('0'); + return; + } + + boolean negative = i < 0; + if (negative) { + out.write('-'); + } else { + // negative range is bigger than positive range, so there is no risk + // of overflow here. + i = -i; + } + + long start = 1000000000000000000L; + while (i / start == 0) { + start /= 10; + } + + while (start > 0) { + out.write('0' - (int) ((i / start) % 10)); + start /= 10; + } + } + + /** + * Parses the byte array argument as if it was a double value and returns the + * result. Throws NumberFormatException if the byte buffer does not represent a + * double value. + * + * @return double, the value represented by the argument + * @throws NumberFormatException if the argument could not be parsed as a double + */ + public static double parseDouble(ByteBuf bytes) { + return parseDouble(bytes, bytes.readerIndex(), bytes.readableBytes()); + } + + /** + * Parses the byte array argument as if it was a double value and returns the + * result. Throws NumberFormatException if the byte buffer does not represent a + * double value. + * + * @return double, the value represented by the argument + * @throws NumberFormatException if the argument could not be parsed as a double + */ + public static double parseDouble(ByteBuf bytes, int start, int length) { + if (!PlatformDependent.hasUnsafe()) { + return parseDouble(bytes.array(), start, length); + } + + if (bytes == null) { + throw new NumberFormatException("String is null"); + } + + if (length == 0 || bytes.writerIndex() < start + length) { + throw new NumberFormatException("Empty string or Invalid buffer!"); + } + + + long memoryAddress = bytes.memoryAddress(); + /* + * Strip off leading blanks + */ + int offset = start; + int end = start + length; + + while (offset < end && PlatformDependent.getByte(memoryAddress + offset) == ' ') { + offset++; + } + if (offset == end) { + throw new NumberFormatException("blank byte array!"); + } + + /* + * check for a sign. + */ + boolean sign = false; + if (PlatformDependent.getByte(memoryAddress + offset) == '-') { + sign = true; + offset++; + } else if (PlatformDependent.getByte(memoryAddress + offset) == '+') { + offset++; + } + if (offset == end) { + throw new NumberFormatException("the byte array only has a sign!"); + } + + /* + * Count the number of digits in the mantissa (including the decimal + * point), and also locate the decimal point. + */ + int mantSize = 0; /* Number of digits in mantissa. */ + int decicalOffset = -1; /* Number of mantissa digits BEFORE decimal point. */ + for (; offset < end; offset++) { + if (!isDigit(PlatformDependent.getByte(memoryAddress + offset))) { + if ((PlatformDependent.getByte(memoryAddress + offset) != '.') || (decicalOffset >= 0)) { + break; + } + decicalOffset = mantSize; + } + mantSize++; + } + + int exponentOffset = offset; /* Temporarily holds location of exponent in bytes. */ + + /* + * Now suck up the digits in the mantissa. Use two integers to + * collect 9 digits each (this is faster than using floating-point). + * If the mantissa has more than 18 digits, ignore the extras, since + * they can't affect the value anyway. + */ + offset -= mantSize; + if (decicalOffset < 0) { + decicalOffset = mantSize; + } else { + mantSize -= 1; /* One of the digits was the decimal point. */ + } + int fracExponent; /* Exponent that derives from the fractional + * part. Under normal circumstatnces, it is + * the negative of the number of digits in F. + * However, if I is very long, the last digits + * of I get dropped (otherwise a long I with a + * large negative exponent could cause an + * unnecessary overflow on I alone). In this + * case, fracExp is incremented one for each + * dropped digit. */ + if (mantSize > 18) { + fracExponent = decicalOffset - 18; + mantSize = 18; + } else { + fracExponent = decicalOffset - mantSize; + } + + if (mantSize == 0) { + return 0.0; + } + + int frac1 = 0; + for (; mantSize > 9; mantSize--) { + int b = PlatformDependent.getByte(memoryAddress + offset); + offset++; + if (b == '.') { + b = PlatformDependent.getByte(memoryAddress + offset); + offset++; + } + frac1 = 10 * frac1 + (b - '0'); + } + int frac2 = 0; + for (; mantSize > 0; mantSize--) { + int b = PlatformDependent.getByte(memoryAddress + offset); + offset++; + if (b == '.') { + b = PlatformDependent.getByte(memoryAddress + offset); + offset++; + } + frac2 = 10 * frac2 + (b - '0'); + } + double fraction = (1.0e9 * frac1) + frac2; + + /* + * Skim off the exponent. + */ + int exponent = 0; /* Exponent read from "EX" field. */ + offset = exponentOffset; + boolean expSign = false; + + if (offset < end) { + if ((PlatformDependent.getByte(memoryAddress + offset) != 'E') + && (PlatformDependent.getByte(memoryAddress + offset) != 'e')) { + throw new NumberFormatException(bytes.toString(start, length, Charset.defaultCharset())); + } + + // (bytes[offset] == 'E') || (bytes[offset] == 'e') + offset++; + + if (PlatformDependent.getByte(memoryAddress + offset) == '-') { + expSign = true; + offset++; + } else if (PlatformDependent.getByte(memoryAddress + offset) == '+') { + offset++; + } + + for (; offset < end; offset++) { + if (isDigit(PlatformDependent.getByte(memoryAddress + offset))) { + exponent = exponent * 10 + (PlatformDependent.getByte(memoryAddress + offset) - '0'); + } else { + throw new NumberFormatException(bytes.toString(start, length, Charset.defaultCharset())); + } + } + } + + exponent = expSign ? (fracExponent - exponent) : (fracExponent + exponent); + + /* + * Generate a floating-point number that represents the exponent. + * Do this by processing the exponent one bit at a time to combine + * many powers of 2 of 10. Then combine the exponent with the + * fraction. + */ + if (exponent < 0) { + expSign = true; + exponent = -exponent; + } else { + expSign = false; + } + if (exponent > maxExponent) { + throw new NumberFormatException(bytes.toString(start, length, Charset.defaultCharset())); + } + + double dblExp = 1.0; + for (int i = 0; exponent != 0; exponent >>= 1, i++) { + if ((exponent & 01) == 01) { + dblExp *= powersOf10[i]; + } + } + + fraction = (expSign) ? (fraction / dblExp) : (fraction * dblExp); + + return sign ? (-fraction) : fraction; + } + + + /** + * Parses the byte buffer argument as if it was an int value and returns the + * result. Throws NumberFormatException if the byte array does not represent an + * int quantity. + * + * @return int the value represented by the argument + * @throws NumberFormatException if the argument could not be parsed as an int quantity. + */ + public static int parseInt(ByteBuf bytes) { + return parseInt(bytes, bytes.readerIndex(), bytes.readableBytes()); + } + + /** + * Parses the byte buffer argument as if it was an int value and returns the + * result. Throws NumberFormatException if the byte array does not represent an + * int quantity. + * + * @return int the value represented by the argument + * @throws NumberFormatException if the argument could not be parsed as an int quantity. + */ + public static int parseInt(ByteBuf bytes, int start, int length) { + return parseInt(bytes, start, length, 10); + } + + /** + * Parses the byte buffer argument as if it was an int value and returns the + * result. Throws NumberFormatException if the byte array does not represent an + * int quantity. The second argument specifies the radix to use when parsing + * the value. + * + * @param radix the base to use for conversion. + * @return the value represented by the argument + * @throws NumberFormatException if the argument could not be parsed as an int quantity. + */ + public static int parseInt(ByteBuf bytes, int start, int length, int radix) { + if (!PlatformDependent.hasUnsafe()) { + return parseInt(bytes.array(), start, length); + } + + if (bytes == null) { + throw new NumberFormatException("String is null"); + } + if (radix < Character.MIN_RADIX || radix > Character.MAX_RADIX) { + throw new NumberFormatException("Invalid radix: " + radix); + } + if (length == 0 || bytes.writerIndex() < start + length) { + throw new NumberFormatException("Empty string or Invalid buffer!"); + } + + long memoryAddress = bytes.memoryAddress(); + + int offset = start; + boolean negative = PlatformDependent.getByte(memoryAddress + start) == '-'; + if (negative || PlatformDependent.getByte(memoryAddress + start) == '+') { + offset++; + if (length == 1) { + throw new NumberFormatException(bytes.toString(start, length, Charset.defaultCharset())); + } + } + + return parseIntInternal(bytes, memoryAddress, start, length, offset, radix, negative); + } + + /** + * @param bytes the string byte buffer + * @param memoryAddress the offheap memory address + * @param start + * @param length + * @param radix the base to use for conversion. + * @param offset the starting position after the sign (if exists) + * @param radix the base to use for conversion. + * @param negative whether the number is negative. + * @return the value represented by the argument + * @throws NumberFormatException if the argument could not be parsed as an int quantity. + */ + private static int parseIntInternal(ByteBuf bytes, long memoryAddress, int start, int length, int offset, + int radix, boolean negative) { + byte separator = '.'; + int max = Integer.MIN_VALUE / radix; + int result = 0, end = start + length; + while (offset < end) { + int digit = digit(PlatformDependent.getByte(memoryAddress + offset++), radix); + if (digit == -1) { + if (PlatformDependent.getByte(memoryAddress + offset - 1) == separator) { + // We allow decimals and will return a truncated integer in that case. + // Therefore we won't throw an exception here (checking the fractional + // part happens below.) + break; + } + throw new NumberFormatException(bytes.toString(start, length, Charset.defaultCharset())); + } + if (max > result) { + throw new NumberFormatException(bytes.toString(start, length, Charset.defaultCharset())); + } + int next = result * radix - digit; + if (next > result) { + throw new NumberFormatException(bytes.toString(start, length, Charset.defaultCharset())); + } + result = next; + } + + // This is the case when we've encountered a decimal separator. The fractional + // part will not change the number, but we will verify that the fractional part + // is well formed. + while (offset < end) { + int digit = digit(PlatformDependent.getByte(memoryAddress + offset++), radix); + if (digit == -1) { + throw new NumberFormatException(bytes.toString(start, length, Charset.defaultCharset())); + } + } + + if (!negative) { + result = -result; + if (result < 0) { + throw new NumberFormatException(bytes.toString(start, length, Charset.defaultCharset())); + } + } + return result; + } + + /** + * Parses the byte buffer argument as if it was a long value and returns the + * result. Throws NumberFormatException if the string does not represent a + * long quantity. + * + * @param bytes the string byte buffer + * @return long the value represented by the argument + * @throws NumberFormatException if the argument could not be parsed as a long quantity. + */ + public static long parseLong(ByteBuf bytes) { + return parseLong(bytes, bytes.readerIndex(), bytes.readableBytes()); + } + + /** + * Parses the byte buffer argument as if it was a long value and returns the + * result. Throws NumberFormatException if the string does not represent a + * long quantity. + * + * @param bytes the string byte buffer + * @param start + * @param length a UTF-8 encoded string representation of a long quantity. + * @return long the value represented by the argument + * @throws NumberFormatException if the argument could not be parsed as a long quantity. + */ + public static long parseLong(ByteBuf bytes, int start, int length) { + return parseLong(bytes, start, length, 10); + } + + /** + * Parses the byte buffer argument as if it was an long value and returns the + * result. Throws NumberFormatException if the string does not represent an + * long quantity. The second argument specifies the radix to use when parsing + * the value. + * + * @param bytes the string byte buffer + * @param start + * @param length a UTF-8 encoded string representation of a long quantity. + * @param radix the base to use for conversion. + * @return the value represented by the argument + * @throws NumberFormatException if the argument could not be parsed as an long quantity. + */ + public static long parseLong(ByteBuf bytes, int start, int length, int radix) { + if (!PlatformDependent.hasUnsafe()) { + return parseInt(bytes.array(), start, length); + } + + if (bytes == null) { + throw new NumberFormatException("String is null"); + } + if (radix < Character.MIN_RADIX || radix > Character.MAX_RADIX) { + throw new NumberFormatException("Invalid radix: " + radix); + } + if (length == 0 || bytes.writerIndex() < start + length) { + throw new NumberFormatException("Empty string or Invalid buffer!"); + } + + long memoryAddress = bytes.memoryAddress(); + + int offset = start; + boolean negative = PlatformDependent.getByte(memoryAddress + start) == '-'; + if (negative || PlatformDependent.getByte(memoryAddress + start) == '+') { + offset++; + if (length == 1) { + throw new NumberFormatException(bytes.toString(start, length, Charset.defaultCharset())); + } + } + + return parseLongInternal(bytes, memoryAddress, start, length, offset, radix, negative); + } + + /** + * /** Parses the byte buffer argument as if it was an long value and returns the + * result. Throws NumberFormatException if the string does not represent an + * long quantity. The second argument specifies the radix to use when parsing + * the value. + * + * @param bytes the string byte buffer + * @param memoryAddress the offheap memory address + * @param start + * @param length a UTF-8 encoded string representation of a long quantity. + * @param offset the starting position after the sign (if exists) + * @param radix the base to use for conversion. + * @param negative whether the number is negative. + * @return the value represented by the argument + * @throws NumberFormatException if the argument could not be parsed as an long quantity. + */ + private static long parseLongInternal(ByteBuf bytes, long memoryAddress, int start, int length, int offset, + int radix, boolean negative) { + byte separator = '.'; + long max = Long.MIN_VALUE / radix; + long result = 0, end = start + length; + while (offset < end) { + int digit = digit(PlatformDependent.getByte(memoryAddress + offset++), radix); + if (digit == -1 || max > result) { + if (PlatformDependent.getByte(memoryAddress + offset - 1) == separator) { + // We allow decimals and will return a truncated integer in that case. + // Therefore we won't throw an exception here (checking the fractional + // part happens below.) + break; + } + throw new NumberFormatException(bytes.toString(start, length, Charset.defaultCharset())); + } + long next = result * radix - digit; + if (next > result) { + throw new NumberFormatException(bytes.toString(start, length, Charset.defaultCharset())); + } + result = next; + } + + // This is the case when we've encountered a decimal separator. The fractional + // part will not change the number, but we will verify that the fractional part + // is well formed. + while (offset < end) { + int digit = digit(PlatformDependent.getByte(memoryAddress + offset++), radix); + if (digit == -1) { + throw new NumberFormatException(bytes.toString(start, length, Charset.defaultCharset())); + } + } + + if (!negative) { + result = -result; + if (result < 0) { + throw new NumberFormatException(bytes.toString(start, length, Charset.defaultCharset())); + } + } + return result; + } + public static Number numberValue(Class numberClazz, String value) { Number returnNumber = null; - + if (numberClazz == null && value == null) { return returnNumber; } - + if (Number.class.isAssignableFrom(numberClazz)) { try { Constructor constructor = numberClazz.getConstructor(String.class); returnNumber = (Number) constructor.newInstance(value); - } catch (Exception ignored) { + } catch (Exception ignored) { } } - + return returnNumber; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/c193cfaa/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java index dbf8435..68d89e7 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java @@ -32,7 +32,6 @@ import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.io.compress.Compressor; -import org.apache.hadoop.util.ReflectionUtils; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.statistics.TableStats; @@ -134,15 +133,7 @@ public class DelimitedTextFile { this.stats = new TableStatistics(this.schema); } - try { - // we need to discuss the De/Serializer interface. so custom serde is to disable - String serdeClass = this.meta.getOption(StorageConstants.TEXTFILE_SERDE, - TextFieldSerializerDeserializer.class.getName()); - serde = (TextFieldSerializerDeserializer) ReflectionUtils.newInstance(Class.forName(serdeClass), conf); - } catch (Throwable e) { - LOG.error(e.getMessage(), e); - throw new IOException(e); - } + serde = new TextFieldSerializerDeserializer(); if (os == null) { os = new NonSyncByteArrayOutputStream(BUFFER_SIZE); @@ -314,15 +305,7 @@ public class DelimitedTextFile { targetColumnIndexes[i] = schema.getColumnId(targets[i].getQualifiedName()); } - try { - // we need to discuss the De/Serializer interface. so custom serde is to disable - String serdeClass = this.meta.getOption(StorageConstants.TEXTFILE_SERDE, - TextFieldSerializerDeserializer.class.getName()); - serde = (TextFieldSerializerDeserializer) ReflectionUtils.newInstance(Class.forName(serdeClass), conf); - } catch (Throwable e) { - LOG.error(e.getMessage(), e); - throw new IOException(e); - } + serde = new TextFieldSerializerDeserializer(); super.init(); Arrays.sort(targetColumnIndexes); @@ -410,8 +393,8 @@ public class DelimitedTextFile { } if (projection.length > currentTarget && currentIndex == projection[currentTarget]) { - Datum datum = serde.deserialize(lineBuf.slice(start, fieldLength), - schema.getColumn(currentIndex), currentIndex, nullChars); + lineBuf.setIndex(start, start + fieldLength); + Datum datum = serde.deserialize(lineBuf, schema.getColumn(currentIndex), currentIndex, nullChars); dst.put(currentIndex, datum); currentTarget++; } http://git-wip-us.apache.org/repos/asf/tajo/blob/c193cfaa/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java index 0057b54..9722959 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java @@ -32,12 +32,14 @@ import org.apache.tajo.util.NumberUtil; import java.io.IOException; import java.io.OutputStream; +import java.nio.charset.CharsetDecoder; //Compatibility with Apache Hive public class TextFieldSerializerDeserializer implements FieldSerializerDeserializer { public static final byte[] trueBytes = "true".getBytes(); public static final byte[] falseBytes = "false".getBytes(); private ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance(); + private final CharsetDecoder decoder = CharsetUtil.getDecoder(CharsetUtil.UTF_8); private static boolean isNull(ByteBuf val, ByteBuf nullBytes) { return !val.isReadable() || nullBytes.equals(val); @@ -122,7 +124,7 @@ public class TextFieldSerializerDeserializer implements FieldSerializerDeseriali } @Override - public Datum deserialize(ByteBuf buf, Column col, int columnIndex, ByteBuf nullChars) throws IOException{ + public Datum deserialize(ByteBuf buf, Column col, int columnIndex, ByteBuf nullChars) throws IOException { Datum datum; TajoDataTypes.Type type = col.getDataType().getType(); boolean nullField; @@ -141,41 +143,30 @@ public class TextFieldSerializerDeserializer implements FieldSerializerDeseriali datum = DatumFactory.createBool(bool == 't' || bool == 'T'); break; case BIT: - datum = DatumFactory.createBit(Byte.parseByte(buf.toString(CharsetUtil.UTF_8))); + datum = DatumFactory.createBit(Byte.parseByte( + decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString())); break; case CHAR: - datum = DatumFactory.createChar(buf.toString(CharsetUtil.UTF_8).trim()); + datum = DatumFactory.createChar( + decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString().trim()); break; case INT1: - case INT2: { - //TODO zero-copy - byte[] bytes = new byte[buf.readableBytes()]; - buf.readBytes(bytes); - datum = DatumFactory.createInt2((short) NumberUtil.parseInt(bytes, 0, bytes.length)); + case INT2: + datum = DatumFactory.createInt2((short) NumberUtil.parseInt(buf)); break; - } - case INT4: { - //TODO zero-copy - byte[] bytes = new byte[buf.readableBytes()]; - buf.readBytes(bytes); - datum = DatumFactory.createInt4(NumberUtil.parseInt(bytes, 0, bytes.length)); + case INT4: + datum = DatumFactory.createInt4(NumberUtil.parseInt(buf)); break; - } case INT8: - //TODO zero-copy - datum = DatumFactory.createInt8(buf.toString(CharsetUtil.UTF_8)); + datum = DatumFactory.createInt8(NumberUtil.parseLong(buf)); break; case FLOAT4: - //TODO zero-copy - datum = DatumFactory.createFloat4(buf.toString(CharsetUtil.UTF_8)); + datum = DatumFactory.createFloat4( + decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()); break; - case FLOAT8: { - //TODO zero-copy - byte[] bytes = new byte[buf.readableBytes()]; - buf.readBytes(bytes); - datum = DatumFactory.createFloat8(NumberUtil.parseDouble(bytes, 0, bytes.length)); + case FLOAT8: + datum = DatumFactory.createFloat8(NumberUtil.parseDouble(buf)); break; - } case TEXT: { byte[] bytes = new byte[buf.readableBytes()]; buf.readBytes(bytes); @@ -183,16 +174,20 @@ public class TextFieldSerializerDeserializer implements FieldSerializerDeseriali break; } case DATE: - datum = DatumFactory.createDate(buf.toString(CharsetUtil.UTF_8)); + datum = DatumFactory.createDate( + decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()); break; case TIME: - datum = DatumFactory.createTime(buf.toString(CharsetUtil.UTF_8)); + datum = DatumFactory.createTime( + decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()); break; case TIMESTAMP: - datum = DatumFactory.createTimestamp(buf.toString(CharsetUtil.UTF_8)); + datum = DatumFactory.createTimestamp( + decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()); break; case INTERVAL: - datum = DatumFactory.createInterval(buf.toString(CharsetUtil.UTF_8)); + datum = DatumFactory.createInterval( + decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()); break; case PROTOBUF: { ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType()); @@ -209,7 +204,8 @@ public class TextFieldSerializerDeserializer implements FieldSerializerDeseriali break; } case INET4: - datum = DatumFactory.createInet4(buf.toString(CharsetUtil.UTF_8)); + datum = DatumFactory.createInet4( + decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()); break; case BLOB: { byte[] bytes = new byte[buf.readableBytes()];