Return-Path: X-Original-To: apmail-tajo-commits-archive@minotaur.apache.org Delivered-To: apmail-tajo-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8B692176A6 for ; Wed, 18 Mar 2015 17:25:53 +0000 (UTC) Received: (qmail 30786 invoked by uid 500); 18 Mar 2015 17:25:40 -0000 Delivered-To: apmail-tajo-commits-archive@tajo.apache.org Received: (qmail 30695 invoked by uid 500); 18 Mar 2015 17:25:40 -0000 Mailing-List: contact commits-help@tajo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tajo.apache.org Delivered-To: mailing list commits@tajo.apache.org Received: (qmail 30569 invoked by uid 99); 18 Mar 2015 17:25:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Mar 2015 17:25:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 35778E18D2; Wed, 18 Mar 2015 17:25:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: jihoonson@apache.org To: commits@tajo.apache.org Date: Wed, 18 Mar 2015 17:25:45 -0000 Message-Id: <4f809f66471e4289bb97f9aeeae39b26@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [06/13] tajo git commit: TAJO-1381: Support multi-bytes delimiter for Text file TAJO-1381: Support multi-bytes delimiter for Text file Closes #410 Signed-off-by: Jinho Kim Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/82d44af3 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/82d44af3 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/82d44af3 Branch: refs/heads/index_support Commit: 82d44af32246c63a32c049292f0a229f16e85768 Parents: 286b956 Author: navis.ryu Authored: Wed Mar 11 08:49:31 2015 +0900 Committer: Jinho Kim Committed: Wed Mar 18 11:47:57 2015 +0900 ---------------------------------------------------------------------- CHANGES | 3 ++ .../tajo/engine/query/TestSelectQuery.java | 24 +++++++++++ .../multibytes_delimiter_table3_ddl.sql | 3 ++ .../multibytes_delimiter_table4_ddl.sql | 3 ++ .../testMultiBytesDelimiter3.sql | 1 + .../testMultiBytesDelimiter4.sql | 1 + .../testMultiBytesDelimiter3.result | 7 +++ .../testMultiBytesDelimiter4.result | 7 +++ .../java/org/apache/tajo/storage/CSVFile.java | 11 +++-- .../tajo/storage/text/CSVLineDeserializer.java | 14 ++++-- .../apache/tajo/storage/text/CSVLineSerDe.java | 5 ++- .../tajo/storage/text/CSVLineSerializer.java | 8 ++-- .../tajo/storage/text/DelimitedTextFile.java | 2 +- .../tajo/storage/text/FieldSplitProcessor.java | 8 +--- .../text/MultiBytesFieldSplitProcessor.java | 45 ++++++++++++++++++++ .../tajo/storage/text/TextLineDeserializer.java | 6 +-- .../apache/tajo/storage/text/TextLineSerDe.java | 3 +- .../apache/tajo/storage/TestSplitProcessor.java | 38 +++++++++++++++-- 18 files changed, 162 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 0d7222f..c3f2691 100644 --- a/CHANGES +++ b/CHANGES @@ -9,6 +9,9 @@ Release 0.11.0 - unreleased IMPROVEMENT + TAJO-1381: Support multi-bytes delimiter for Text file. + (Contributed by navis, Committed by jinho) + TAJO-1391: RpcConnectionPool should check reference counter of connection before close. (Contributed by navis, Committed by jihun) http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java index dd93dd1..f7b1382 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java @@ -659,4 +659,28 @@ public class TestSelectQuery extends QueryTestCaseBase { executeString("DROP TABLE table2"); } } + + @Test + public void testMultiBytesDelimiter3() throws Exception { + executeDDL("multibytes_delimiter_table3_ddl.sql", "multibytes_delimiter1"); + try { + ResultSet res = executeQuery(); + assertResultSet(res); + cleanupQuery(res); + } finally { + executeString("DROP TABLE table1"); + } + } + + @Test + public void testMultiBytesDelimiter4() throws Exception { + executeDDL("multibytes_delimiter_table4_ddl.sql", "multibytes_delimiter2"); + try { + ResultSet res = executeQuery(); + assertResultSet(res); + cleanupQuery(res); + } finally { + executeString("DROP TABLE table2"); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-core/src/test/resources/queries/TestSelectQuery/multibytes_delimiter_table3_ddl.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestSelectQuery/multibytes_delimiter_table3_ddl.sql b/tajo-core/src/test/resources/queries/TestSelectQuery/multibytes_delimiter_table3_ddl.sql new file mode 100644 index 0000000..8309d11 --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestSelectQuery/multibytes_delimiter_table3_ddl.sql @@ -0,0 +1,3 @@ +create external table table1 (id int, name text, score float, type text) using text +with ('text.delimiter'='||', 'text.null'='NULL') location ${table.path}; + http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-core/src/test/resources/queries/TestSelectQuery/multibytes_delimiter_table4_ddl.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestSelectQuery/multibytes_delimiter_table4_ddl.sql b/tajo-core/src/test/resources/queries/TestSelectQuery/multibytes_delimiter_table4_ddl.sql new file mode 100644 index 0000000..2fb821a --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestSelectQuery/multibytes_delimiter_table4_ddl.sql @@ -0,0 +1,3 @@ +create external table table2 (id int, name text, score float, type text) using text +with ('text.delimiter'='ㅎ', 'text.null'='NULL') location ${table.path}; + http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-core/src/test/resources/queries/TestSelectQuery/testMultiBytesDelimiter3.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestSelectQuery/testMultiBytesDelimiter3.sql b/tajo-core/src/test/resources/queries/TestSelectQuery/testMultiBytesDelimiter3.sql new file mode 100644 index 0000000..bd6b02d --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestSelectQuery/testMultiBytesDelimiter3.sql @@ -0,0 +1 @@ +select * from table1; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-core/src/test/resources/queries/TestSelectQuery/testMultiBytesDelimiter4.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestSelectQuery/testMultiBytesDelimiter4.sql b/tajo-core/src/test/resources/queries/TestSelectQuery/testMultiBytesDelimiter4.sql new file mode 100644 index 0000000..66a69ec --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestSelectQuery/testMultiBytesDelimiter4.sql @@ -0,0 +1 @@ +select * from table2; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-core/src/test/resources/results/TestSelectQuery/testMultiBytesDelimiter3.result ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/results/TestSelectQuery/testMultiBytesDelimiter3.result b/tajo-core/src/test/resources/results/TestSelectQuery/testMultiBytesDelimiter3.result new file mode 100644 index 0000000..d8d43b1 --- /dev/null +++ b/tajo-core/src/test/resources/results/TestSelectQuery/testMultiBytesDelimiter3.result @@ -0,0 +1,7 @@ +id,name,score,type +------------------------------- +1,ooo,1.1,a +2,ppp,2.3, +3,qqq,null, +4,,4.5, +null,xxx,5.6,e \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-core/src/test/resources/results/TestSelectQuery/testMultiBytesDelimiter4.result ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/results/TestSelectQuery/testMultiBytesDelimiter4.result b/tajo-core/src/test/resources/results/TestSelectQuery/testMultiBytesDelimiter4.result new file mode 100644 index 0000000..d8d43b1 --- /dev/null +++ b/tajo-core/src/test/resources/results/TestSelectQuery/testMultiBytesDelimiter4.result @@ -0,0 +1,7 @@ +id,name,score,type +------------------------------- +1,ooo,1.1,a +2,ppp,2.3, +3,qqq,null, +4,,4.5, +null,xxx,5.6,e \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java index bb628b1..c1047d9 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java @@ -40,6 +40,7 @@ import org.apache.tajo.storage.compress.CodecPool; import org.apache.tajo.storage.exception.AlreadyExistsStorageException; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream; +import org.apache.tajo.util.Bytes; import org.apache.tajo.util.BytesUtils; import java.io.*; @@ -83,7 +84,8 @@ public class CSVFile { this.meta = meta; this.schema = schema; this.delimiter = StringEscapeUtils.unescapeJava( - this.meta.getOption(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER)).getBytes(); + this.meta.getOption(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER)) + .getBytes(Bytes.UTF8_CHARSET); this.columnNum = schema.size(); @@ -93,7 +95,7 @@ public class CSVFile { if (StringUtils.isEmpty(nullCharacters)) { nullChars = NullDatum.get().asTextBytes(); } else { - nullChars = nullCharacters.getBytes(); + nullChars = nullCharacters.getBytes(Bytes.UTF8_CHARSET); } } @@ -265,7 +267,8 @@ public class CSVFile { //Delimiter this.delimiter = StringEscapeUtils.unescapeJava( meta.getOption(StorageConstants.TEXT_DELIMITER, - meta.getOption(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER))).getBytes(); + meta.getOption(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER))) + .getBytes(Bytes.UTF8_CHARSET); String nullCharacters = StringEscapeUtils.unescapeJava( meta.getOption(StorageConstants.TEXT_NULL, @@ -274,7 +277,7 @@ public class CSVFile { if (StringUtils.isEmpty(nullCharacters)) { nullChars = NullDatum.get().asTextBytes(); } else { - nullChars = nullCharacters.getBytes(); + nullChars = nullCharacters.getBytes(Bytes.UTF8_CHARSET); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java index 1599f62..6a8c7a9 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java @@ -19,6 +19,7 @@ package org.apache.tajo.storage.text; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufProcessor; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.datum.Datum; @@ -28,9 +29,10 @@ import org.apache.tajo.storage.Tuple; import java.io.IOException; public class CSVLineDeserializer extends TextLineDeserializer { - private FieldSplitProcessor processor; + private ByteBufProcessor processor; private FieldSerializerDeserializer fieldSerDer; private ByteBuf nullChars; + private int delimiterCompensation; public CSVLineDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) { super(schema, meta, targetColumnIndexes); @@ -38,7 +40,13 @@ public class CSVLineDeserializer extends TextLineDeserializer { @Override public void init() { - this.processor = new FieldSplitProcessor(CSVLineSerDe.getFieldDelimiter(meta)); + byte[] delimiter = CSVLineSerDe.getFieldDelimiter(meta); + if (delimiter.length == 1) { + this.processor = new FieldSplitProcessor(delimiter[0]); + } else { + this.processor = new MultiBytesFieldSplitProcessor(delimiter); + } + this.delimiterCompensation = delimiter.length - 1; if (nullChars != null) { nullChars.release(); @@ -67,7 +75,7 @@ public class CSVLineDeserializer extends TextLineDeserializer { if (end < 0) { fieldLength = rowLength - start; } else { - fieldLength = end - start; + fieldLength = end - start - delimiterCompensation; } if (projection.length > currentTarget && currentIndex == projection[currentTarget]) { http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java index 2fe7f23..988d5d1 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java @@ -22,6 +22,7 @@ import org.apache.commons.lang.StringEscapeUtils; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.storage.StorageConstants; +import org.apache.tajo.util.Bytes; public class CSVLineSerDe extends TextLineSerDe { @Override @@ -34,8 +35,8 @@ public class CSVLineSerDe extends TextLineSerDe { return new CSVLineSerializer(schema, meta); } - public static char getFieldDelimiter(TableMeta meta) { + public static byte[] getFieldDelimiter(TableMeta meta) { return StringEscapeUtils.unescapeJava(meta.getOption(StorageConstants.TEXT_DELIMITER, - StorageConstants.DEFAULT_FIELD_DELIMITER)).charAt(0); + StorageConstants.DEFAULT_FIELD_DELIMITER)).getBytes(Bytes.UTF8_CHARSET); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java index 53a0ef3..9a2fe37 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java @@ -30,8 +30,8 @@ import java.io.OutputStream; public class CSVLineSerializer extends TextLineSerializer { private FieldSerializerDeserializer serde; - private byte [] nullChars; - private char delimiter; + private byte[] nullChars; + private byte[] delimiter; private int columnNum; public CSVLineSerializer(Schema schema, TableMeta meta) { @@ -56,8 +56,8 @@ public class CSVLineSerializer extends TextLineSerializer { writtenBytes += serde.serialize(out, datum, schema.getColumn(i), i, nullChars); if (columnNum - 1 > i) { - out.write((byte) delimiter); - writtenBytes += 1; + out.write(delimiter); + writtenBytes += delimiter.length; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java index ebf9608..4c9234e 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java @@ -391,7 +391,7 @@ public class DelimitedTextFile { try { deserializer.deserialize(buf, tuple); - // if a line is read normaly, it exists this loop. + // if a line is read normally, it exists this loop. break; } catch (TextLineParsingError tae) { http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java index a5ac142..862b5ae 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java @@ -21,9 +21,9 @@ package org.apache.tajo.storage.text; import io.netty.buffer.ByteBufProcessor; public class FieldSplitProcessor implements ByteBufProcessor { - private char delimiter; //the ascii separate character + private byte delimiter; //the ascii separate character - public FieldSplitProcessor(char recordDelimiterByte) { + public FieldSplitProcessor(byte recordDelimiterByte) { this.delimiter = recordDelimiterByte; } @@ -31,8 +31,4 @@ public class FieldSplitProcessor implements ByteBufProcessor { public boolean process(byte value) throws Exception { return delimiter != value; } - - public char getDelimiter() { - return delimiter; - } } http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/MultiBytesFieldSplitProcessor.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/MultiBytesFieldSplitProcessor.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/MultiBytesFieldSplitProcessor.java new file mode 100644 index 0000000..b97d7c6 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/MultiBytesFieldSplitProcessor.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.text; + +import io.netty.buffer.ByteBufProcessor; + +public class MultiBytesFieldSplitProcessor implements ByteBufProcessor { + + private int index; + private final byte[] delimiter; + + public MultiBytesFieldSplitProcessor(byte[] recordDelimiterByte) { + this.delimiter = recordDelimiterByte; + } + + @Override + public boolean process(byte value) throws Exception { + if (delimiter[index] != value) { + index = 0; + return true; + } + if (index != delimiter.length - 1) { + index++; + return true; + } + index = 0; + return false; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java index 7ebfa79..89a7de9 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java @@ -29,9 +29,9 @@ import java.io.IOException; * Reads a text line and fills a Tuple with values */ public abstract class TextLineDeserializer { - protected Schema schema; - protected TableMeta meta; - protected int [] targetColumnIndexes; + protected final Schema schema; + protected final TableMeta meta; + protected final int[] targetColumnIndexes; public TextLineDeserializer(Schema schema, TableMeta meta, int [] targetColumnIndexes) { this.schema = schema; http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java index e81e289..1a53bb0 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java @@ -26,6 +26,7 @@ import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.datum.NullDatum; import org.apache.tajo.storage.BufferPool; import org.apache.tajo.storage.StorageConstants; +import org.apache.tajo.util.Bytes; /** * Pluggable Text Line SerDe class @@ -56,7 +57,7 @@ public abstract class TextLineSerDe { if (StringUtils.isEmpty(nullCharacters)) { nullChars = NullDatum.get().asTextBytes(); } else { - nullChars = nullCharacters.getBytes(); + nullChars = nullCharacters.getBytes(Bytes.UTF8_CHARSET); } return nullChars; http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java index 12ea551..2174d62 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java @@ -19,10 +19,12 @@ package org.apache.tajo.storage; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufProcessor; import io.netty.buffer.Unpooled; import io.netty.util.CharsetUtil; import org.apache.tajo.storage.text.FieldSplitProcessor; import org.apache.tajo.storage.text.LineSplitProcessor; +import org.apache.tajo.storage.text.MultiBytesFieldSplitProcessor; import org.junit.Test; import java.io.IOException; @@ -35,17 +37,47 @@ public class TestSplitProcessor { @Test public void testFieldSplitProcessor() throws IOException { - String data = "abc||de"; + String data = "abc||de|"; final ByteBuf buf = releaseLater( Unpooled.copiedBuffer(data, CharsetUtil.ISO_8859_1)); final int len = buf.readableBytes(); - FieldSplitProcessor processor = new FieldSplitProcessor('|'); + FieldSplitProcessor processor = new FieldSplitProcessor((byte)'|'); assertEquals(3, buf.forEachByte(0, len, processor)); assertEquals(4, buf.forEachByte(4, len - 4, processor)); - assertEquals(-1, buf.forEachByte(5, len - 5, processor)); + assertEquals(7, buf.forEachByte(5, len - 5, processor)); + assertEquals(-1, buf.forEachByte(8, len - 8, processor)); + } + + @Test + public void testMultiCharFieldSplitProcessor1() throws IOException { + String data = "abc||||de||"; + final ByteBuf buf = releaseLater( + Unpooled.copiedBuffer(data, CharsetUtil.ISO_8859_1)); + + final int len = buf.readableBytes(); + ByteBufProcessor processor = new MultiBytesFieldSplitProcessor("||".getBytes()); + + assertEquals(4, buf.forEachByte(0, len, processor)); + assertEquals(6, buf.forEachByte(5, len - 5, processor)); + assertEquals(10, buf.forEachByte(7, len - 7, processor)); + assertEquals(-1, buf.forEachByte(11, len - 11, processor)); + } + + @Test + public void testMultiCharFieldSplitProcessor2() throws IOException { + String data = "abcㅎㅎdeㅎ"; + final ByteBuf buf = releaseLater( + Unpooled.copiedBuffer(data, CharsetUtil.UTF_8)); + + final int len = buf.readableBytes(); + ByteBufProcessor processor = new MultiBytesFieldSplitProcessor("ㅎ".getBytes()); + assertEquals(5, buf.forEachByte(0, len, processor)); + assertEquals(8, buf.forEachByte(6, len - 6, processor)); + assertEquals(13, buf.forEachByte(9, len - 9, processor)); + assertEquals(-1, buf.forEachByte(14, len - 14, processor)); } @Test