tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject [06/13] tajo git commit: TAJO-1381: Support multi-bytes delimiter for Text file
Date Wed, 18 Mar 2015 17:25:45 GMT
TAJO-1381: Support multi-bytes delimiter for Text file

Closes #410

Signed-off-by: Jinho Kim <jhkim@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/82d44af3
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/82d44af3
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/82d44af3

Branch: refs/heads/index_support
Commit: 82d44af32246c63a32c049292f0a229f16e85768
Parents: 286b956
Author: navis.ryu <navis@apache.org>
Authored: Wed Mar 11 08:49:31 2015 +0900
Committer: Jinho Kim <jhkim@apache.org>
Committed: Wed Mar 18 11:47:57 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  3 ++
 .../tajo/engine/query/TestSelectQuery.java      | 24 +++++++++++
 .../multibytes_delimiter_table3_ddl.sql         |  3 ++
 .../multibytes_delimiter_table4_ddl.sql         |  3 ++
 .../testMultiBytesDelimiter3.sql                |  1 +
 .../testMultiBytesDelimiter4.sql                |  1 +
 .../testMultiBytesDelimiter3.result             |  7 +++
 .../testMultiBytesDelimiter4.result             |  7 +++
 .../java/org/apache/tajo/storage/CSVFile.java   | 11 +++--
 .../tajo/storage/text/CSVLineDeserializer.java  | 14 ++++--
 .../apache/tajo/storage/text/CSVLineSerDe.java  |  5 ++-
 .../tajo/storage/text/CSVLineSerializer.java    |  8 ++--
 .../tajo/storage/text/DelimitedTextFile.java    |  2 +-
 .../tajo/storage/text/FieldSplitProcessor.java  |  8 +---
 .../text/MultiBytesFieldSplitProcessor.java     | 45 ++++++++++++++++++++
 .../tajo/storage/text/TextLineDeserializer.java |  6 +--
 .../apache/tajo/storage/text/TextLineSerDe.java |  3 +-
 .../apache/tajo/storage/TestSplitProcessor.java | 38 +++++++++++++++--
 18 files changed, 162 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 0d7222f..c3f2691 100644
--- a/CHANGES
+++ b/CHANGES
@@ -9,6 +9,9 @@ Release 0.11.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1381: Support multi-bytes delimiter for Text file.
+    (Contributed by navis, Committed by jinho)
+
     TAJO-1391: RpcConnectionPool should check reference counter of connection 
     before close. (Contributed by navis, Committed by jihun)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java
index dd93dd1..f7b1382 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java
@@ -659,4 +659,28 @@ public class TestSelectQuery extends QueryTestCaseBase {
       executeString("DROP TABLE table2");
     }
   }
+
+  @Test
+  public void testMultiBytesDelimiter3() throws Exception {
+    executeDDL("multibytes_delimiter_table3_ddl.sql", "multibytes_delimiter1");
+    try {
+      ResultSet res = executeQuery();
+      assertResultSet(res);
+      cleanupQuery(res);
+    } finally {
+      executeString("DROP TABLE table1");
+    }
+  }
+
+  @Test
+  public void testMultiBytesDelimiter4() throws Exception {
+    executeDDL("multibytes_delimiter_table4_ddl.sql", "multibytes_delimiter2");
+    try {
+      ResultSet res = executeQuery();
+      assertResultSet(res);
+      cleanupQuery(res);
+    } finally {
+      executeString("DROP TABLE table2");
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-core/src/test/resources/queries/TestSelectQuery/multibytes_delimiter_table3_ddl.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestSelectQuery/multibytes_delimiter_table3_ddl.sql
b/tajo-core/src/test/resources/queries/TestSelectQuery/multibytes_delimiter_table3_ddl.sql
new file mode 100644
index 0000000..8309d11
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestSelectQuery/multibytes_delimiter_table3_ddl.sql
@@ -0,0 +1,3 @@
+create external table table1 (id int, name text, score float, type text) using text
+with ('text.delimiter'='||', 'text.null'='NULL') location ${table.path};
+

http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-core/src/test/resources/queries/TestSelectQuery/multibytes_delimiter_table4_ddl.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestSelectQuery/multibytes_delimiter_table4_ddl.sql
b/tajo-core/src/test/resources/queries/TestSelectQuery/multibytes_delimiter_table4_ddl.sql
new file mode 100644
index 0000000..2fb821a
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestSelectQuery/multibytes_delimiter_table4_ddl.sql
@@ -0,0 +1,3 @@
+create external table table2 (id int, name text, score float, type text) using text
+with ('text.delimiter'='ㅎ', 'text.null'='NULL') location ${table.path};
+

http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-core/src/test/resources/queries/TestSelectQuery/testMultiBytesDelimiter3.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestSelectQuery/testMultiBytesDelimiter3.sql
b/tajo-core/src/test/resources/queries/TestSelectQuery/testMultiBytesDelimiter3.sql
new file mode 100644
index 0000000..bd6b02d
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestSelectQuery/testMultiBytesDelimiter3.sql
@@ -0,0 +1 @@
+select * from table1;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-core/src/test/resources/queries/TestSelectQuery/testMultiBytesDelimiter4.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestSelectQuery/testMultiBytesDelimiter4.sql
b/tajo-core/src/test/resources/queries/TestSelectQuery/testMultiBytesDelimiter4.sql
new file mode 100644
index 0000000..66a69ec
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestSelectQuery/testMultiBytesDelimiter4.sql
@@ -0,0 +1 @@
+select * from table2;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-core/src/test/resources/results/TestSelectQuery/testMultiBytesDelimiter3.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestSelectQuery/testMultiBytesDelimiter3.result
b/tajo-core/src/test/resources/results/TestSelectQuery/testMultiBytesDelimiter3.result
new file mode 100644
index 0000000..d8d43b1
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestSelectQuery/testMultiBytesDelimiter3.result
@@ -0,0 +1,7 @@
+id,name,score,type
+-------------------------------
+1,ooo,1.1,a
+2,ppp,2.3,
+3,qqq,null,
+4,,4.5,
+null,xxx,5.6,e
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-core/src/test/resources/results/TestSelectQuery/testMultiBytesDelimiter4.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestSelectQuery/testMultiBytesDelimiter4.result
b/tajo-core/src/test/resources/results/TestSelectQuery/testMultiBytesDelimiter4.result
new file mode 100644
index 0000000..d8d43b1
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestSelectQuery/testMultiBytesDelimiter4.result
@@ -0,0 +1,7 @@
+id,name,score,type
+-------------------------------
+1,ooo,1.1,a
+2,ppp,2.3,
+3,qqq,null,
+4,,4.5,
+null,xxx,5.6,e
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
index bb628b1..c1047d9 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
@@ -40,6 +40,7 @@ import org.apache.tajo.storage.compress.CodecPool;
 import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
 import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
+import org.apache.tajo.util.Bytes;
 import org.apache.tajo.util.BytesUtils;
 
 import java.io.*;
@@ -83,7 +84,8 @@ public class CSVFile {
       this.meta = meta;
       this.schema = schema;
       this.delimiter = StringEscapeUtils.unescapeJava(
-          this.meta.getOption(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER)).getBytes();
+          this.meta.getOption(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER))
+          .getBytes(Bytes.UTF8_CHARSET);
 
       this.columnNum = schema.size();
 
@@ -93,7 +95,7 @@ public class CSVFile {
       if (StringUtils.isEmpty(nullCharacters)) {
         nullChars = NullDatum.get().asTextBytes();
       } else {
-        nullChars = nullCharacters.getBytes();
+        nullChars = nullCharacters.getBytes(Bytes.UTF8_CHARSET);
       }
     }
 
@@ -265,7 +267,8 @@ public class CSVFile {
       //Delimiter
       this.delimiter = StringEscapeUtils.unescapeJava(
           meta.getOption(StorageConstants.TEXT_DELIMITER,
-          meta.getOption(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER))).getBytes();
+          meta.getOption(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER)))
+          .getBytes(Bytes.UTF8_CHARSET);
 
       String nullCharacters = StringEscapeUtils.unescapeJava(
           meta.getOption(StorageConstants.TEXT_NULL,
@@ -274,7 +277,7 @@ public class CSVFile {
       if (StringUtils.isEmpty(nullCharacters)) {
         nullChars = NullDatum.get().asTextBytes();
       } else {
-        nullChars = nullCharacters.getBytes();
+        nullChars = nullCharacters.getBytes(Bytes.UTF8_CHARSET);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
index 1599f62..6a8c7a9 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.storage.text;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufProcessor;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.datum.Datum;
@@ -28,9 +29,10 @@ import org.apache.tajo.storage.Tuple;
 import java.io.IOException;
 
 public class CSVLineDeserializer extends TextLineDeserializer {
-  private FieldSplitProcessor processor;
+  private ByteBufProcessor processor;
   private FieldSerializerDeserializer fieldSerDer;
   private ByteBuf nullChars;
+  private int delimiterCompensation;
 
   public CSVLineDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) {
     super(schema, meta, targetColumnIndexes);
@@ -38,7 +40,13 @@ public class CSVLineDeserializer extends TextLineDeserializer {
 
   @Override
   public void init() {
-    this.processor = new FieldSplitProcessor(CSVLineSerDe.getFieldDelimiter(meta));
+    byte[] delimiter = CSVLineSerDe.getFieldDelimiter(meta);
+    if (delimiter.length == 1) {
+      this.processor = new FieldSplitProcessor(delimiter[0]);
+    } else {
+      this.processor = new MultiBytesFieldSplitProcessor(delimiter);
+    }
+    this.delimiterCompensation = delimiter.length - 1;
 
     if (nullChars != null) {
       nullChars.release();
@@ -67,7 +75,7 @@ public class CSVLineDeserializer extends TextLineDeserializer {
       if (end < 0) {
         fieldLength = rowLength - start;
       } else {
-        fieldLength = end - start;
+        fieldLength = end - start - delimiterCompensation;
       }
 
       if (projection.length > currentTarget && currentIndex == projection[currentTarget])
{

http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java
index 2fe7f23..988d5d1 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java
@@ -22,6 +22,7 @@ import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.util.Bytes;
 
 public class CSVLineSerDe extends TextLineSerDe {
   @Override
@@ -34,8 +35,8 @@ public class CSVLineSerDe extends TextLineSerDe {
     return new CSVLineSerializer(schema, meta);
   }
 
-  public static char getFieldDelimiter(TableMeta meta) {
+  public static byte[] getFieldDelimiter(TableMeta meta) {
     return StringEscapeUtils.unescapeJava(meta.getOption(StorageConstants.TEXT_DELIMITER,
-        StorageConstants.DEFAULT_FIELD_DELIMITER)).charAt(0);
+        StorageConstants.DEFAULT_FIELD_DELIMITER)).getBytes(Bytes.UTF8_CHARSET);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java
index 53a0ef3..9a2fe37 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java
@@ -30,8 +30,8 @@ import java.io.OutputStream;
 public class CSVLineSerializer extends TextLineSerializer {
   private FieldSerializerDeserializer serde;
 
-  private byte [] nullChars;
-  private char delimiter;
+  private byte[] nullChars;
+  private byte[] delimiter;
   private int columnNum;
 
   public CSVLineSerializer(Schema schema, TableMeta meta) {
@@ -56,8 +56,8 @@ public class CSVLineSerializer extends TextLineSerializer {
       writtenBytes += serde.serialize(out, datum, schema.getColumn(i), i, nullChars);
 
       if (columnNum - 1 > i) {
-        out.write((byte) delimiter);
-        writtenBytes += 1;
+        out.write(delimiter);
+        writtenBytes += delimiter.length;
       }
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
index ebf9608..4c9234e 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
@@ -391,7 +391,7 @@ public class DelimitedTextFile {
 
           try {
             deserializer.deserialize(buf, tuple);
-            // if a line is read normaly, it exists this loop.
+            // if a line is read normally, it exists this loop.
             break;
 
           } catch (TextLineParsingError tae) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java
index a5ac142..862b5ae 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java
@@ -21,9 +21,9 @@ package org.apache.tajo.storage.text;
 import io.netty.buffer.ByteBufProcessor;
 
 public class FieldSplitProcessor implements ByteBufProcessor {
-  private char delimiter; //the ascii separate character
+  private byte delimiter; //the ascii separate character
 
-  public FieldSplitProcessor(char recordDelimiterByte) {
+  public FieldSplitProcessor(byte recordDelimiterByte) {
     this.delimiter = recordDelimiterByte;
   }
 
@@ -31,8 +31,4 @@ public class FieldSplitProcessor implements ByteBufProcessor {
   public boolean process(byte value) throws Exception {
     return delimiter != value;
   }
-
-  public char getDelimiter() {
-    return delimiter;
-  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/MultiBytesFieldSplitProcessor.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/MultiBytesFieldSplitProcessor.java
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/MultiBytesFieldSplitProcessor.java
new file mode 100644
index 0000000..b97d7c6
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/MultiBytesFieldSplitProcessor.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.text;
+
+import io.netty.buffer.ByteBufProcessor;
+
+public class MultiBytesFieldSplitProcessor implements ByteBufProcessor {
+
+  private int index;
+  private final byte[] delimiter;
+
+  public MultiBytesFieldSplitProcessor(byte[] recordDelimiterByte) {
+    this.delimiter = recordDelimiterByte;
+  }
+
+  @Override
+  public boolean process(byte value) throws Exception {
+    if (delimiter[index] != value) {
+      index = 0;
+      return true;
+    }
+    if (index != delimiter.length - 1) {
+      index++;
+      return true;
+    }
+    index = 0;
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
index 7ebfa79..89a7de9 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
@@ -29,9 +29,9 @@ import java.io.IOException;
  * Reads a text line and fills a Tuple with values
  */
 public abstract class TextLineDeserializer {
-  protected Schema schema;
-  protected TableMeta meta;
-  protected int [] targetColumnIndexes;
+  protected final Schema schema;
+  protected final TableMeta meta;
+  protected final int[] targetColumnIndexes;
 
   public TextLineDeserializer(Schema schema, TableMeta meta, int [] targetColumnIndexes)
{
     this.schema = schema;

http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java
index e81e289..1a53bb0 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java
@@ -26,6 +26,7 @@ import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.storage.BufferPool;
 import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.util.Bytes;
 
 /**
  * Pluggable Text Line SerDe class
@@ -56,7 +57,7 @@ public abstract class TextLineSerDe {
     if (StringUtils.isEmpty(nullCharacters)) {
       nullChars = NullDatum.get().asTextBytes();
     } else {
-      nullChars = nullCharacters.getBytes();
+      nullChars = nullCharacters.getBytes(Bytes.UTF8_CHARSET);
     }
 
     return nullChars;

http://git-wip-us.apache.org/repos/asf/tajo/blob/82d44af3/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java
index 12ea551..2174d62 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java
@@ -19,10 +19,12 @@
 package org.apache.tajo.storage;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufProcessor;
 import io.netty.buffer.Unpooled;
 import io.netty.util.CharsetUtil;
 import org.apache.tajo.storage.text.FieldSplitProcessor;
 import org.apache.tajo.storage.text.LineSplitProcessor;
+import org.apache.tajo.storage.text.MultiBytesFieldSplitProcessor;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -35,17 +37,47 @@ public class TestSplitProcessor {
 
   @Test
   public void testFieldSplitProcessor() throws IOException {
-    String data = "abc||de";
+    String data = "abc||de|";
     final ByteBuf buf = releaseLater(
         Unpooled.copiedBuffer(data, CharsetUtil.ISO_8859_1));
 
     final int len = buf.readableBytes();
-    FieldSplitProcessor processor = new FieldSplitProcessor('|');
+    FieldSplitProcessor processor = new FieldSplitProcessor((byte)'|');
 
     assertEquals(3, buf.forEachByte(0, len, processor));
     assertEquals(4, buf.forEachByte(4, len - 4, processor));
-    assertEquals(-1, buf.forEachByte(5, len - 5, processor));
+    assertEquals(7, buf.forEachByte(5, len - 5, processor));
+    assertEquals(-1, buf.forEachByte(8, len - 8, processor));
+  }
+
+  @Test
+  public void testMultiCharFieldSplitProcessor1() throws IOException {
+    String data = "abc||||de||";
+    final ByteBuf buf = releaseLater(
+        Unpooled.copiedBuffer(data, CharsetUtil.ISO_8859_1));
+
+    final int len = buf.readableBytes();
+    ByteBufProcessor processor = new MultiBytesFieldSplitProcessor("||".getBytes());
+
+    assertEquals(4, buf.forEachByte(0, len, processor));
+    assertEquals(6, buf.forEachByte(5, len - 5, processor));
+    assertEquals(10, buf.forEachByte(7, len - 7, processor));
+    assertEquals(-1, buf.forEachByte(11, len - 11, processor));
+  }
+
+  @Test
+  public void testMultiCharFieldSplitProcessor2() throws IOException {
+    String data = "abcㅎㅎdeㅎ";
+    final ByteBuf buf = releaseLater(
+        Unpooled.copiedBuffer(data, CharsetUtil.UTF_8));
+
+    final int len = buf.readableBytes();
+    ByteBufProcessor processor = new MultiBytesFieldSplitProcessor("ㅎ".getBytes());
 
+    assertEquals(5, buf.forEachByte(0, len, processor));
+    assertEquals(8, buf.forEachByte(6, len - 6, processor));
+    assertEquals(13, buf.forEachByte(9, len - 9, processor));
+    assertEquals(-1, buf.forEachByte(14, len - 14, processor));
   }
 
   @Test


Mime
View raw message