drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amansi...@apache.org
Subject [5/6] drill git commit: DRILL-5498: Better handling of CSV column headers
Date Sat, 20 May 2017 14:26:04 GMT
DRILL-5498: Better handling of CSV column headers

See DRILL-5498 for details.

Replaced the repeated varchar reader for reading columns with a purpose
built column parser. Implemented rules to recover from invalid column
headers.

Added missing test method

Changes re code review comments

Back out testing-only change

close apache/drill#830


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/f21edb05
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/f21edb05
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/f21edb05

Branch: refs/heads/master
Commit: f21edb057efd83989d7462910541e528a8779b79
Parents: 7f98400
Author: Paul Rogers <progers@maprtech.com>
Authored: Wed May 10 16:17:24 2017 -0700
Committer: Aman Sinha <asinha@maprtech.com>
Committed: Fri May 19 10:08:31 2017 -0700

----------------------------------------------------------------------
 .../compliant/CompliantTextRecordReader.java    |   9 +-
 .../easy/text/compliant/HeaderBuilder.java      | 274 +++++++++++++++++++
 .../exec/store/easy/text/compliant/TestCsv.java | 150 ++++++++++
 .../easy/text/compliant/TestHeaderBuilder.java  | 223 +++++++++++++++
 .../org/apache/drill/test/ClusterFixture.java   |  13 +-
 5 files changed, 660 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/f21edb05/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
index 93c4ff8..e253730 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
@@ -179,11 +179,7 @@ public class CompliantTextRecordReader extends AbstractRecordReader {
     // don't skip header in case skipFirstLine is set true
     settings.setSkipFirstLine(false);
 
-    // setup Output using OutputMutator
-    // we should use a separate output mutator to avoid reshaping query output with header
data
-    HeaderOutputMutator hOutputMutator = new HeaderOutputMutator();
-    TextOutput hOutput = new RepeatedVarCharOutput(hOutputMutator, getColumns(), true);
-    this.allocate(hOutputMutator.fieldVectorMap);
+    HeaderBuilder hOutput = new HeaderBuilder();
 
     // setup Input using InputStream
     // we should read file header irrespective of split given given to this reader
@@ -198,11 +194,10 @@ public class CompliantTextRecordReader extends AbstractRecordReader
{
     reader.parseNext();
 
     // grab the field names from output
-    String [] fieldNames = ((RepeatedVarCharOutput)hOutput).getTextOutput();
+    String [] fieldNames = hOutput.getHeaders();
 
     // cleanup and set to skip the first line next time we read input
     reader.close();
-    hOutputMutator.close();
     settings.setSkipFirstLine(true);
 
     return fieldNames;

http://git-wip-us.apache.org/repos/asf/drill/blob/f21edb05/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/HeaderBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/HeaderBuilder.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/HeaderBuilder.java
new file mode 100644
index 0000000..8910c26
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/HeaderBuilder.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.base.Charsets;
+
+/**
+ * Text output that implements a header reader/parser.
+ * The caller parses out the characters of each header;
+ * this class assembles UTF-8 bytes into Unicode characters,
+ * fixes invalid characters (those not legal for SQL symbols),
+ * and maps duplicate names to unique names.
+ * <p>
+ * That is, this class is as permissive as possible with file
+ * headers to avoid spurious query failures for trivial reasons.
+ */
+
+// Note: this class uses Java heap strings and the usual Java
+// convenience classes. Since we do heavy Unicode string operations,
+// and read a single row, there is no good reason to try to use
+// value vectors and direct memory for this task.
+
+public class HeaderBuilder extends TextOutput {
+
+  /**
+   * Maximum Drill symbol length, as enforced for headers.
+   * @see <a href="https://drill.apache.org/docs/lexical-structure/#identifier">
+   * identifier documentation</a>
+   */
+  // TODO: Replace with the proper constant, if available
+  public static final int MAX_HEADER_LEN = 1024;
+
+  /**
+   * Prefix used to replace non-alphabetic characters at the start of
+   * a column name. For example, $foo becomes col_foo. Used
+   * because SQL does not allow _foo.
+   */
+
+  public static final String COLUMN_PREFIX = "col_";
+
+  /**
+   * Prefix used to create numbered columns for missing
+   * headers. Typical names: column_1, column_2, ...
+   */
+
+  public static final String ANONYMOUS_COLUMN_PREFIX = "column_";
+
+  /**
+   * Exception that reports header errors. Is an unchecked exception
+   * to avoid cluttering the normal field reader interface.
+   */
+  public static class HeaderError extends RuntimeException {
+
+    private static final long serialVersionUID = 1L;
+
+    public HeaderError(String msg) {
+      super(msg);
+    }
+
+    public HeaderError(int colIndex, String msg) {
+      super("Column " + (colIndex + 1) + ": " + msg);
+    }
+  }
+
+  public final List<String> headers = new ArrayList<>();
+  public final ByteBuffer currentField = ByteBuffer.allocate(MAX_HEADER_LEN);
+
+  @Override
+  public void startField(int index) {
+    currentField.clear();
+  }
+
+  @Override
+  public boolean endField() {
+    String header = new String(currentField.array(), 0, currentField.position(), Charsets.UTF_8);
+    header = validateSymbol(header);
+    headers.add(header);
+    return true;
+  }
+
+  @Override
+  public boolean endEmptyField() {
+
+    // Empty header will be rewritten to "column_<n>".
+
+    return endField();
+  }
+
+  /**
+   * Validate the header name according to the SQL lexical rules.
+   * @see <a href="https://drill.apache.org/docs/lexical-structure/#identifier">
+   * identifier documentation</a>
+   * @param header the header name to validate
+   */
+
+  // TODO: Replace with existing code, if any.
+  private String validateSymbol(String header) {
+    header = header.trim();
+
+    // To avoid unnecessary query failures, just make up a column name
+    // if the name is missing or all blanks.
+
+    if (header.isEmpty()) {
+      return ANONYMOUS_COLUMN_PREFIX + (headers.size() + 1);
+    }
+    if (! Character.isAlphabetic(header.charAt(0))) {
+      return rewriteHeader(header);
+    }
+    for (int i = 1; i < header.length(); i++) {
+      char ch = header.charAt(i);
+      if (! Character.isAlphabetic(ch)  &&
+          ! Character.isDigit(ch)  &&  ch != '_') {
+        return rewriteHeader(header);
+      }
+    }
+    return header;
+  }
+
+  /**
+   * Given an invalid header, rewrite it to replace illegal characters
+   * with valid ones. The header won't be what the user specified,
+   * but it will be a valid SQL identifier. This solution avoids failing
+   * queries due to corrupted or invalid header data.
+   * <p>
+   * Names with invalid first characters are mapped to "col_". Example:
+   * $foo maps to col_foo. If the only character is non-alphabetic, treat
+   * the column as anonymous and create a generic name: column_4, etc.
+   * <p>
+   * This mapping could create a column that exceeds the maximum length
+   * of 1024. Since that is not really a hard limit, we just live with the
+   * extra few characters.
+   *
+   * @param header the original header
+   * @return the rewritten header, valid for SQL
+   */
+
+  private String rewriteHeader(String header) {
+    final StringBuilder buf = new StringBuilder();
+
+    // If starts with non-alphabetic, can't map the character to
+    // underscore, so just tack on a prefix.
+
+    char ch = header.charAt(0);
+    if (Character.isAlphabetic(ch)) {
+      buf.append(ch);
+    } else if (Character.isDigit(ch)) {
+      buf.append(COLUMN_PREFIX);
+      buf.append(ch);
+
+      // For the strange case of only one character, format
+      // the same as an empty header.
+
+    } else if (header.length() == 1) {
+      return ANONYMOUS_COLUMN_PREFIX + (headers.size() + 1);
+    } else {
+      buf.append(COLUMN_PREFIX);
+    }
+
+    // Convert all remaining invalid characters to underscores
+
+    for (int i = 1; i < header.length(); i++) {
+      ch = header.charAt(i);
+      if (Character.isAlphabetic(ch)  ||
+          Character.isDigit(ch)  ||  ch == '_') {
+        buf.append(ch);
+      } else {
+        buf.append("_");
+      }
+    }
+    return buf.toString();
+  }
+
+  @Override
+  public void append(byte data) {
+
+    // Ensure the data fits. Note that, if the name is Unicode, the actual
+    // number of characters might be less than the limit even though the
+    // byte count exceeds the limit. Fixing this, in general, would require
+    // a buffer four times larger, so we leave that as a later improvement
+    // if ever needed.
+
+    try {
+      currentField.put(data);
+    } catch (BufferOverflowException e) {
+      throw new HeaderError(headers.size(), "Column exceeds maximum length of " + MAX_HEADER_LEN);
+    }
+  }
+
+  @Override
+  public void finishRecord() {
+    if (headers.isEmpty()) {
+      throw new HeaderError("The file must define at least one header.");
+    }
+
+    // Force headers to be unique.
+
+    final Set<String> idents = new HashSet<String>();
+    for (int i = 0; i < headers.size(); i++) {
+      String header = headers.get(i);
+      String key = header.toLowerCase();
+
+      // Is the header a duplicate?
+
+      if (idents.contains(key)) {
+
+        // Make header unique by appending a suffix.
+        // This loop must end because we have a finite
+        // number of headers.
+        // The original column is assumed to be "1", so
+        // the first duplicate is "2", and so on.
+        // Note that this will map columns of the form:
+        // "col,col,col_2,col_2_2" to
+        // "col", "col_2", "col_2_2", "col_2_2_2".
+        // No mapping scheme is perfect...
+
+        for (int l = 2;  ; l++) {
+          final String rewritten = header + "_" + l;
+          key = rewritten.toLowerCase();
+          if (! idents.contains(key)) {
+            headers.set(i, rewritten);
+            break;
+          }
+        }
+      }
+      idents.add(key);
+    }
+  }
+
+  @Override
+  public long getRecordCount() { return 1; }
+
+  @Override
+  public void startBatch() { }
+
+  @Override
+  public void finishBatch() { }
+
+  @Override
+  public boolean rowHasData() {
+    return ! headers.isEmpty();
+  }
+
+  public String[] getHeaders() {
+
+    // Just return the headers: any needed checks were done in
+    // finishRecord()
+
+    final String array[] = new String[headers.size()];
+    return headers.toArray(array);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f21edb05/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsv.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsv.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsv.java
new file mode 100644
index 0000000..7d38cf9
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsv.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.‰
+ */
+package org.apache.drill.exec.store.easy.text.compliant;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.store.easy.text.TextFormatPlugin.TextFormatConfig;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetBuilder;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.SchemaBuilder;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * SQL-level tests for CSV headers. See
+ * {@link TestHeaderBuilder} for detailed unit tests.
+ * This test does not attempt to duplicate all the cases
+ * from the unit tests; instead it just does a sanity check.
+ */
+
+public class TestCsv extends ClusterTest {
+
+  private static File testDir;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    startCluster(ClusterFixture.builder()
+        .maxParallelization(1)
+        );
+
+    // Set up CSV storage plugin using headers.
+
+    testDir = cluster.makeTempDir("csv");
+    TextFormatConfig csvFormat = new TextFormatConfig();
+    csvFormat.fieldDelimiter = ',';
+    csvFormat.skipFirstLine = false;
+    csvFormat.extractHeader = true;
+    cluster.defineWorkspace("dfs", "data", testDir.getAbsolutePath(), "csv", csvFormat);
+  }
+
+  String emptyHeaders[] = {
+      "",
+      "10,foo,bar"
+  };
+
+  @Test
+  public void testEmptyCsvHeaders() throws IOException {
+    String fileName = "case1.csv";
+    buildFile(fileName, emptyHeaders);
+    try {
+      client.queryBuilder().sql(makeStatement(fileName)).run();
+      fail();
+    } catch (Exception e) {
+      assertTrue(e.getMessage().contains("must define at least one header"));
+    }
+  }
+
+  String validHeaders[] = {
+      "a,b,c",
+      "10,foo,bar"
+  };
+
+  @Test
+  public void testValidCsvHeaders() throws IOException {
+    String fileName = "case2.csv";
+    buildFile(fileName, validHeaders);
+    RowSet actual = client.queryBuilder().sql(makeStatement(fileName)).rowSet();
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .add("a", MinorType.VARCHAR)
+        .add("b", MinorType.VARCHAR)
+        .add("c", MinorType.VARCHAR)
+        .build();
+    assertEquals(expectedSchema, actual.batchSchema());
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+        .add("10", "foo", "bar")
+        .build();
+    new RowSetComparison(expected)
+      .verifyAndClear(actual);
+  }
+
+  String invalidHeaders[] = {
+      "$,,9b,c,c,c_2",
+      "10,foo,bar,fourth,fifth,sixth"
+  };
+
+  @Test
+  public void testInvalidCsvHeaders() throws IOException {
+    String fileName = "case3.csv";
+    buildFile(fileName, invalidHeaders);
+    RowSet actual = client.queryBuilder().sql(makeStatement(fileName)).rowSet();
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .add("column_1", MinorType.VARCHAR)
+        .add("column_2", MinorType.VARCHAR)
+        .add("col_9b", MinorType.VARCHAR)
+        .add("c", MinorType.VARCHAR)
+        .add("c_2", MinorType.VARCHAR)
+        .add("c_2_2", MinorType.VARCHAR)
+        .build();
+    assertEquals(expectedSchema, actual.batchSchema());
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+        .add("10", "foo", "bar", "fourth", "fifth", "sixth")
+        .build();
+    new RowSetComparison(expected)
+      .verifyAndClear(actual);
+  }
+
+  private String makeStatement(String fileName) {
+    return "SELECT * FROM `dfs.data`.`" + fileName + "`";
+  }
+
+  private void buildFile(String fileName, String[] data) throws IOException {
+    try(PrintWriter out = new PrintWriter(new FileWriter(new File(testDir, fileName)))) {
+      for (String line : data) {
+        out.println(line);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f21edb05/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestHeaderBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestHeaderBuilder.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestHeaderBuilder.java
new file mode 100644
index 0000000..47bb903
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestHeaderBuilder.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.drill.exec.store.easy.text.compliant.HeaderBuilder.HeaderError;
+import org.apache.drill.test.DrillTest;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+
+public class TestHeaderBuilder extends DrillTest {
+
+  @Test
+  public void testEmptyHeader() {
+    HeaderBuilder hb = new HeaderBuilder();
+    hb.startBatch();
+    try {
+      hb.finishRecord();
+    } catch (HeaderError e) {
+      assertTrue(e.getMessage().contains("must define at least one header"));
+    }
+
+    hb = new HeaderBuilder();
+    hb.startBatch();
+    parse(hb,"");
+    try {
+      hb.finishRecord();
+    } catch (HeaderError e) {
+      assertTrue(e.getMessage().contains("must define at least one header"));
+    }
+
+    hb = new HeaderBuilder();
+    hb.startBatch();
+    parse(hb,"   ");
+    validateHeader(hb, new String[] {"column_1"});
+
+    hb = new HeaderBuilder();
+    hb.startBatch();
+    parse(hb,",");
+    validateHeader(hb, new String[] {"column_1", "column_2"});
+
+    hb = new HeaderBuilder();
+    hb.startBatch();
+    parse(hb," , ");
+    validateHeader(hb, new String[] {"column_1", "column_2"});
+
+    hb = new HeaderBuilder();
+    hb.startBatch();
+    parse(hb,"a,   ");
+    validateHeader(hb, new String[] {"a", "column_2"});
+  }
+
+  @Test
+  public void testWhiteSpace() {
+    HeaderBuilder hb = new HeaderBuilder();
+    hb.startBatch();
+    parse(hb,"a");
+    validateHeader(hb, new String[] {"a"});
+
+    hb = new HeaderBuilder();
+    hb.startBatch();
+    parse(hb," a ");
+    validateHeader(hb, new String[] {"a"});
+
+    hb = new HeaderBuilder();
+    hb.startBatch();
+    parse(hb,"    a    ");
+    validateHeader(hb, new String[] {"a"});
+
+    hb = new HeaderBuilder();
+    hb.startBatch();
+    parse(hb,"a,b,c");
+    validateHeader(hb, new String[] {"a","b","c"});
+
+    hb = new HeaderBuilder();
+    hb.startBatch();
+    parse(hb," a , b ,  c ");
+    validateHeader(hb, new String[] {"a","b","c"});
+  }
+
+  @Test
+  public void testSyntax() {
+    HeaderBuilder hb = new HeaderBuilder();
+    hb.startBatch();
+    parse(hb,"a_123");
+    validateHeader(hb, new String[] {"a_123"});
+
+    hb = new HeaderBuilder();
+    hb.startBatch();
+    parse(hb,"a_123_");
+    validateHeader(hb, new String[] {"a_123_"});
+
+    hb = new HeaderBuilder();
+    hb.startBatch();
+    parse(hb,"az09_");
+    validateHeader(hb, new String[] {"az09_"});
+
+    hb = new HeaderBuilder();
+    hb.startBatch();
+    parse(hb,"+");
+    validateHeader(hb, new String[] {"column_1"});
+
+    hb = new HeaderBuilder();
+    hb.startBatch();
+    parse(hb,"+,-");
+    validateHeader(hb, new String[] {"column_1", "column_2"});
+
+    hb = new HeaderBuilder();
+    hb.startBatch();
+    parse(hb,"+9a");
+    validateHeader(hb, new String[] {"col_9a"});
+
+    hb = new HeaderBuilder();
+    hb.startBatch();
+    parse(hb,"9a");
+    validateHeader(hb, new String[] {"col_9a"});
+
+    hb = new HeaderBuilder();
+    hb.startBatch();
+    parse(hb,"a+b");
+    validateHeader(hb, new String[] {"a_b"});
+
+    hb = new HeaderBuilder();
+    hb.startBatch();
+    parse(hb,"a_b");
+    validateHeader(hb, new String[] {"a_b"});
+
+    hb = new HeaderBuilder();
+    hb.startBatch();
+    parse(hb,"EXPR$0");
+    validateHeader(hb, new String[] {"EXPR_0"});
+
+    hb = new HeaderBuilder();
+    hb.startBatch();
+    parse(hb,"(_-^-_)");
+    validateHeader(hb, new String[] {"col_______"});
+  }
+
+  @Test
+  public void testUnicode() {
+    HeaderBuilder hb = new HeaderBuilder();
+    hb.startBatch();
+    parse(hb,"Αθήνα");
+    validateHeader(hb, new String[] {"Αθήνα"});
+
+    hb = new HeaderBuilder();
+    hb.startBatch();
+    parse(hb,"Москва");
+    validateHeader(hb, new String[] {"Москва"});
+
+    hb = new HeaderBuilder();
+    hb.startBatch();
+    parse(hb,"Paris,Αθήνα,Москва");
+    validateHeader(hb, new String[] {"Paris","Αθήνα","Москва"});
+  }
+
+  @Test
+  public void testDuplicateNames() {
+    testParser("a,a", new String[] {"a","a_2"});
+    testParser("a,A", new String[] {"a","A_2"});
+    // It ain't pretty, but it is unique...
+    testParser("a,A,A_2", new String[] {"a","A_2", "A_2_2"});
+    // Verify with non-ASCII characters
+    testParser("Αθήνα,ΑθήνΑ", new String[] {"Αθήνα","ΑθήνΑ_2"});
+  }
+
+  private void testParser(String input, String[] expected) {
+    HeaderBuilder hb = new HeaderBuilder();
+    hb.startBatch();
+    parse(hb,input);
+    hb.finishRecord();
+    validateHeader(hb, expected);
+  }
+
+  private void parse(HeaderBuilder hb, String input) {
+    if (input == null) {
+      return;
+    }
+    byte bytes[] = input.getBytes(Charsets.UTF_8);
+    if (bytes.length == 0) {
+      return;
+    }
+    int fieldIndex = -1;
+    hb.startField(++fieldIndex);
+    for (int i = 0; i < bytes.length; i++) {
+      byte b = bytes[i];
+      if (b == ',') {
+        hb.endField();
+        hb.startField(++fieldIndex);
+      } else {
+        hb.append(b);
+      }
+    }
+    hb.endField();
+  }
+
+  private void validateHeader(HeaderBuilder hb, String[] expected) {
+    String actual[] = hb.getHeaders();
+    assertEquals(expected.length, actual.length);
+    for (int i = 0; i < expected.length; i++) {
+      assertEquals(expected[i], actual[i]);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f21edb05/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
index 0ce337d..513fe3a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
@@ -36,6 +36,7 @@ import org.apache.drill.DrillTestWrapper.TestServices;
 import org.apache.drill.QueryTestUtil;
 import org.apache.drill.TestBuilder;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ZookeeperHelper;
 import org.apache.drill.exec.client.DrillClient;
@@ -526,9 +527,14 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable
{
 
   public void defineWorkspace(String pluginName, String schemaName, String path,
       String defaultFormat) {
+    defineWorkspace(pluginName, schemaName, path, defaultFormat, null);
+  }
+
+  public void defineWorkspace(String pluginName, String schemaName, String path,
+      String defaultFormat, FormatPluginConfig format) {
     for (Drillbit bit : drillbits()) {
       try {
-        defineWorkspace(bit, pluginName, schemaName, path, defaultFormat);
+        defineWorkspace(bit, pluginName, schemaName, path, defaultFormat, format);
       } catch (ExecutionSetupException e) {
         // This functionality is supposed to work in tests. Change
         // exception to unchecked to make test code simpler.
@@ -539,7 +545,7 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable
{
   }
 
   public static void defineWorkspace(Drillbit drillbit, String pluginName,
-      String schemaName, String path, String defaultFormat)
+      String schemaName, String path, String defaultFormat, FormatPluginConfig format)
       throws ExecutionSetupException {
     @SuppressWarnings("resource")
     final StoragePluginRegistry pluginRegistry = drillbit.getContext().getStorage();
@@ -550,6 +556,9 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable
{
 
     pluginConfig.workspaces.remove(schemaName);
     pluginConfig.workspaces.put(schemaName, newTmpWSConfig);
+    if (format != null) {
+      pluginConfig.formats.put(defaultFormat, format);
+    }
 
     pluginRegistry.createOrUpdate(pluginName, pluginConfig, true);
   }


Mime
View raw message