phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gr...@apache.org
Subject [2/2] phoenix git commit: PHOENIX-2238 Support non-printable delimiters
Date Sat, 19 Sep 2015 18:35:30 GMT
PHOENIX-2238 Support non-printable delimiters

Work around serialization issues for non-printable characters
in Hadoop Configuration objects by base64-encoding the delimiter
characters for CSV bulk load.


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/6dcc8826
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/6dcc8826
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/6dcc8826

Branch: refs/heads/master
Commit: 6dcc88262386ce6340c1a60a9f0b5e4cb6c787ed
Parents: 8864837
Author: Gabriel Reid <gabrielr@ngdata.com>
Authored: Fri Sep 11 22:50:01 2015 +0200
Committer: Gabriel Reid <greid@apache.org>
Committed: Sat Sep 19 20:30:33 2015 +0200

----------------------------------------------------------------------
 .../phoenix/mapreduce/CsvBulkImportUtil.java    | 22 ++++++--
 .../phoenix/mapreduce/CsvToKeyValueMapper.java  |  6 ++-
 .../mapreduce/CsvBulkImportUtilTest.java        | 57 ++++++++++++++++----
 3 files changed, 71 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/6dcc8826/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java
index 8f0f7d5..6d77cd5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java
@@ -19,7 +19,9 @@ package org.apache.phoenix.mapreduce;
 
 import java.util.List;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Base64;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.util.ColumnInfo;
 
@@ -49,9 +51,9 @@ public class CsvBulkImportUtil {
         Preconditions.checkNotNull(columnInfoList);
         Preconditions.checkArgument(!columnInfoList.isEmpty(), "Column info list is empty");
         conf.set(CsvToKeyValueMapper.TABLE_NAME_CONFKEY, tableName);
-        conf.set(CsvToKeyValueMapper.FIELD_DELIMITER_CONFKEY, String.valueOf(fieldDelimiter));
-        conf.set(CsvToKeyValueMapper.QUOTE_CHAR_CONFKEY, String.valueOf(quoteChar));
-        conf.set(CsvToKeyValueMapper.ESCAPE_CHAR_CONFKEY, String.valueOf(escapeChar));
+        setChar(conf, CsvToKeyValueMapper.FIELD_DELIMITER_CONFKEY, fieldDelimiter);
+        setChar(conf, CsvToKeyValueMapper.QUOTE_CHAR_CONFKEY, quoteChar);
+        setChar(conf, CsvToKeyValueMapper.ESCAPE_CHAR_CONFKEY, escapeChar);
         if (arrayDelimiter != null) {
             conf.set(CsvToKeyValueMapper.ARRAY_DELIMITER_CONFKEY, arrayDelimiter);
         }
@@ -70,4 +72,18 @@ public class CsvBulkImportUtil {
         conf.setClass(PhoenixConfigurationUtil.UPSERT_HOOK_CLASS_CONFKEY, processorClass,
                 ImportPreUpsertKeyValueProcessor.class);
     }
+
+    @VisibleForTesting
+    static void setChar(Configuration conf, String confKey, char charValue) {
+        conf.set(confKey, Base64.encodeBytes(Character.toString(charValue).getBytes()));
+    }
+
+    @VisibleForTesting
+    static Character getCharacter(Configuration conf, String confKey) {
+        String strValue = conf.get(confKey);
+        if (strValue == null) {
+            return null;
+        }
+        return new String(Base64.decode(strValue)).charAt(0);
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6dcc8826/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
index 68270d4..2e69048 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
@@ -125,8 +125,10 @@ public class CsvToKeyValueMapper extends Mapper<LongWritable,Text,ImmutableBytes
         upsertListener = new MapperUpsertListener(
                 context, conf.getBoolean(IGNORE_INVALID_ROW_CONFKEY, true));
         csvUpsertExecutor = buildUpsertExecutor(conf);
-        csvLineParser = new CsvLineParser(conf.get(FIELD_DELIMITER_CONFKEY).charAt(0), conf.get(QUOTE_CHAR_CONFKEY).charAt(0),
-                conf.get(ESCAPE_CHAR_CONFKEY).charAt(0));
+        csvLineParser = new CsvLineParser(
+                CsvBulkImportUtil.getCharacter(conf, FIELD_DELIMITER_CONFKEY),
+                CsvBulkImportUtil.getCharacter(conf, QUOTE_CHAR_CONFKEY),
+                CsvBulkImportUtil.getCharacter(conf, ESCAPE_CHAR_CONFKEY));
 
         preUpdateProcessor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
         if(!conf.get(CsvToKeyValueMapper.INDEX_TABLE_NAME_CONFKEY, "").isEmpty()){

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6dcc8826/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java
index a00e228..f52a837 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java
@@ -20,6 +20,10 @@ package org.apache.phoenix.mapreduce;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
@@ -34,13 +38,13 @@ import com.google.common.collect.ImmutableList;
 public class CsvBulkImportUtilTest {
 
     @Test
-    public void testInitCsvImportJob() {
+    public void testInitCsvImportJob() throws IOException {
         Configuration conf = new Configuration();
 
         String tableName = "SCHEMANAME.TABLENAME";
-        char delimiter = '!';
-        char quote = '"';
-        char escape = '\\';
+        char delimiter = '\001';
+        char quote = '\002';
+        char escape = '!';
 
         List<ColumnInfo> columnInfoList = ImmutableList.of(
                 new ColumnInfo("MYCOL", PInteger.INSTANCE.getSqlType()));
@@ -48,11 +52,27 @@ public class CsvBulkImportUtilTest {
         CsvBulkImportUtil.initCsvImportJob(
                 conf, tableName, delimiter, quote, escape, null, columnInfoList, true);
 
-        assertEquals(tableName, conf.get(CsvToKeyValueMapper.TABLE_NAME_CONFKEY));
-        assertEquals("!", conf.get(CsvToKeyValueMapper.FIELD_DELIMITER_CONFKEY));
-        assertNull(conf.get(CsvToKeyValueMapper.ARRAY_DELIMITER_CONFKEY));
-        assertEquals(columnInfoList, CsvToKeyValueMapper.buildColumnInfoList(conf));
-        assertEquals(true, conf.getBoolean(CsvToKeyValueMapper.IGNORE_INVALID_ROW_CONFKEY,
false));
+        // Serialize and deserialize the config to ensure that there aren't any issues
+        // with non-printable characters as delimiters
+        File tempFile = File.createTempFile("test-config", ".xml");
+        FileOutputStream fileOutputStream = new FileOutputStream(tempFile);
+        conf.writeXml(fileOutputStream);
+        fileOutputStream.close();
+        Configuration deserialized = new Configuration();
+        deserialized.addResource(new FileInputStream(tempFile));
+
+        assertEquals(tableName, deserialized.get(CsvToKeyValueMapper.TABLE_NAME_CONFKEY));
+        assertEquals(Character.valueOf('\001'),
+                CsvBulkImportUtil.getCharacter(deserialized, CsvToKeyValueMapper.FIELD_DELIMITER_CONFKEY));
+        assertEquals(Character.valueOf('\002'),
+                CsvBulkImportUtil.getCharacter(deserialized, CsvToKeyValueMapper.QUOTE_CHAR_CONFKEY));
+        assertEquals(Character.valueOf('!'),
+                CsvBulkImportUtil.getCharacter(deserialized, CsvToKeyValueMapper.ESCAPE_CHAR_CONFKEY));
+        assertNull(deserialized.get(CsvToKeyValueMapper.ARRAY_DELIMITER_CONFKEY));
+        assertEquals(columnInfoList, CsvToKeyValueMapper.buildColumnInfoList(deserialized));
+        assertEquals(true, deserialized.getBoolean(CsvToKeyValueMapper.IGNORE_INVALID_ROW_CONFKEY,
false));
+
+        tempFile.delete();
     }
 
     @Test
@@ -63,6 +83,25 @@ public class CsvBulkImportUtilTest {
         assertEquals(MockProcessor.class, processor.getClass());
     }
 
+    @Test
+    public void testGetAndSetChar_BasicChar() {
+        Configuration conf = new Configuration();
+        CsvBulkImportUtil.setChar(conf, "conf.key", '|');
+        assertEquals(Character.valueOf('|'), CsvBulkImportUtil.getCharacter(conf, "conf.key"));
+    }
+
+    @Test
+    public void testGetAndSetChar_NonPrintableChar() {
+        Configuration conf = new Configuration();
+        CsvBulkImportUtil.setChar(conf, "conf.key", '\001');
+        assertEquals(Character.valueOf('\001'), CsvBulkImportUtil.getCharacter(conf, "conf.key"));
+    }
+
+    @Test
+    public void testGetChar_NotPresent() {
+        Configuration conf = new Configuration();
+        assertNull(CsvBulkImportUtil.getCharacter(conf, "conf.key"));
+    }
 
     public static class MockProcessor implements ImportPreUpsertKeyValueProcessor {
 


Mime
View raw message