phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maryann...@apache.org
Subject [17/26] phoenix git commit: PHOENIX-3134 varbinary fields bulk load difference between MR/psql and upserts
Date Fri, 27 Jan 2017 01:19:01 GMT
PHOENIX-3134 varbinary fields bulk load difference between MR/psql and upserts


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/a6752119
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/a6752119
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/a6752119

Branch: refs/heads/calcite
Commit: a675211909415ca376e432d25f8a8822aadf5712
Parents: 7332fc3
Author: Ankit Singhal <ankitsinghal59@gmail.com>
Authored: Wed Jan 18 13:27:17 2017 +0530
Committer: Ankit Singhal <ankitsinghal59@gmail.com>
Committed: Wed Jan 18 13:27:17 2017 +0530

----------------------------------------------------------------------
 .../expression/function/EncodeFormat.java       |  4 +-
 .../phoenix/mapreduce/CsvBulkImportUtil.java    |  7 +-
 .../phoenix/mapreduce/CsvBulkLoadTool.java      | 12 ++-
 .../org/apache/phoenix/query/QueryServices.java |  3 +
 .../phoenix/query/QueryServicesOptions.java     |  9 ++-
 .../apache/phoenix/schema/types/PBinary.java    |  6 +-
 .../apache/phoenix/schema/types/PVarbinary.java |  5 +-
 .../org/apache/phoenix/util/PhoenixRuntime.java | 11 +++
 .../phoenix/util/csv/CsvUpsertExecutor.java     | 25 ++++++
 .../phoenix/util/json/JsonUpsertExecutor.java   | 44 +++++++++++
 .../mapreduce/CsvBulkImportUtilTest.java        |  8 +-
 .../util/AbstractUpsertExecutorTest.java        | 82 +++++++++++++++++---
 .../phoenix/util/csv/CsvUpsertExecutorTest.java | 26 +++----
 .../util/json/JsonUpsertExecutorTest.java       |  6 ++
 14 files changed, 207 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/a6752119/phoenix-core/src/main/java/org/apache/phoenix/expression/function/EncodeFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/EncodeFormat.java
b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/EncodeFormat.java
index ca6cb66..8130228 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/EncodeFormat.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/EncodeFormat.java
@@ -20,5 +20,7 @@ package org.apache.phoenix.expression.function;
 public enum EncodeFormat {
 
 	HEX, //format for encoding HEX value to bytes
-	BASE62 //format for encoding a base 10 long value to base 62 string
+	BASE62, //format for encoding a base 10 long value to base 62 string
+	BASE64, //format for encoding a base 10 long value to base 64 string
+	ASCII // Plain Text
 };

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a6752119/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java
index 9289dbf..ff9ff72 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.util.Base64;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -38,15 +39,19 @@ public class CsvBulkImportUtil {
      * @param quoteChar quote character for the CSV input
      * @param escapeChar escape character for the CSV input
      * @param arrayDelimiter array delimiter character, can be null
+     * @param binaryEncoding 
      */
     public static void initCsvImportJob(Configuration conf, char fieldDelimiter, char quoteChar,
-            char escapeChar, String arrayDelimiter) {
+            char escapeChar, String arrayDelimiter, String binaryEncoding) {
         setChar(conf, CsvToKeyValueMapper.FIELD_DELIMITER_CONFKEY, fieldDelimiter);
         setChar(conf, CsvToKeyValueMapper.QUOTE_CHAR_CONFKEY, quoteChar);
         setChar(conf, CsvToKeyValueMapper.ESCAPE_CHAR_CONFKEY, escapeChar);
         if (arrayDelimiter != null) {
             conf.set(CsvToKeyValueMapper.ARRAY_DELIMITER_CONFKEY, arrayDelimiter);
         }
+        if(binaryEncoding!=null){
+            conf.set(QueryServices.UPLOAD_BINARY_DATA_TYPE_ENCODING, binaryEncoding);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a6752119/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
index 8ed66b8..14b8c34 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
@@ -35,6 +35,7 @@ public class CsvBulkLoadTool extends AbstractBulkLoadTool {
     static final Option QUOTE_OPT = new Option("q", "quote", true, "Supply a custom phrase
delimiter, defaults to double quote character");
     static final Option ESCAPE_OPT = new Option("e", "escape", true, "Supply a custom escape
character, default is a backslash");
     static final Option ARRAY_DELIMITER_OPT = new Option("a", "array-delimiter", true, "Array
element delimiter (optional)");
+    static final Option binaryEncodingOption = new Option("b", "binaryEncoding", true, "Specifies
binary encoding");
 
     @Override
     protected Options getOptions() {
@@ -43,6 +44,7 @@ public class CsvBulkLoadTool extends AbstractBulkLoadTool {
         options.addOption(QUOTE_OPT);
         options.addOption(ESCAPE_OPT);
         options.addOption(ARRAY_DELIMITER_OPT);
+        options.addOption(binaryEncodingOption);
         return options;
     }
 
@@ -79,13 +81,19 @@ public class CsvBulkLoadTool extends AbstractBulkLoadTool {
             }
             escapeChar = escapeString.charAt(0);
         }
-
+        
+        String binaryEncoding = null;
+        if (cmdLine.hasOption(binaryEncodingOption.getOpt())) {
+            binaryEncoding = cmdLine.getOptionValue(binaryEncodingOption.getOpt());
+        }
+        
         CsvBulkImportUtil.initCsvImportJob(
                 conf,
                 delimiterChar,
                 quoteChar,
                 escapeChar,
-                cmdLine.getOptionValue(ARRAY_DELIMITER_OPT.getOpt()));
+                cmdLine.getOptionValue(ARRAY_DELIMITER_OPT.getOpt()),
+                binaryEncoding);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a6752119/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 1e002d2..14d9887 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -225,6 +225,9 @@ public interface QueryServices extends SQLCloseable {
     public static final String INDEX_POPULATION_SLEEP_TIME = "phoenix.index.population.wait.time";
     public static final String LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB = "phoenix.client.localIndexUpgrade";
     public static final String LIMITED_QUERY_SERIAL_THRESHOLD = "phoenix.limited.query.serial.threshold";
+    
+    //currently BASE64 and ASCII is supported
+    public static final String UPLOAD_BINARY_DATA_TYPE_ENCODING = "phoenix.upload.binaryDataType.encoding";
 
     public static final String INDEX_ASYNC_BUILD_ENABLED = "phoenix.index.async.build.enabled";
     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a6752119/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index ef3ac39..0e8b9d5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -79,6 +79,7 @@ import static org.apache.phoenix.query.QueryServices.STATS_USE_CURRENT_TIME_ATTR
 import static org.apache.phoenix.query.QueryServices.THREAD_POOL_SIZE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.THREAD_TIMEOUT_MS_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.TRANSACTIONS_ENABLED;
+import static org.apache.phoenix.query.QueryServices.UPLOAD_BINARY_DATA_TYPE_ENCODING;
 import static org.apache.phoenix.query.QueryServices.USE_BYTE_BASED_REGEX_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.USE_INDEXES_ATTRIB;
 
@@ -98,6 +99,7 @@ import org.apache.phoenix.util.DateUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 
 
+
 /**
  * Options for {@link QueryServices}.
  *
@@ -271,6 +273,10 @@ public class QueryServicesOptions {
       }
     };
     public static final String DEFAULT_SCHEMA = null;
+    public static final String DEFAULT_UPLOAD_BINARY_DATA_TYPE_ENCODING = "BASE64"; // for
backward compatibility, till
+                                                                                    // 4.10,
psql and CSVBulkLoad
+                                                                                    // expects
binary data to be base 64
+                                                                                    // encoded
     
     private final Configuration config;
 
@@ -338,7 +344,8 @@ public class QueryServicesOptions {
             .setIfUnset(IS_NAMESPACE_MAPPING_ENABLED, DEFAULT_IS_NAMESPACE_MAPPING_ENABLED)
             .setIfUnset(IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE, DEFAULT_IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE)
             .setIfUnset(LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB, DEFAULT_LOCAL_INDEX_CLIENT_UPGRADE)
-            .setIfUnset(AUTO_UPGRADE_ENABLED, DEFAULT_AUTO_UPGRADE_ENABLED);
+            .setIfUnset(AUTO_UPGRADE_ENABLED, DEFAULT_AUTO_UPGRADE_ENABLED)
+            .setIfUnset(UPLOAD_BINARY_DATA_TYPE_ENCODING, DEFAULT_UPLOAD_BINARY_DATA_TYPE_ENCODING);
         // HBase sets this to 1, so we reset it to something more appropriate.
         // Hopefully HBase will change this, because we can't know if a user set
         // it to 1, so we'll change it.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a6752119/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PBinary.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PBinary.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PBinary.java
index 43906f0..3a9dcc7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PBinary.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PBinary.java
@@ -22,7 +22,6 @@ import java.text.Format;
 import java.util.Arrays;
 
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Base64;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.exception.DataExceedsCapacityException;
 import org.apache.phoenix.query.QueryConstants;
@@ -182,10 +181,7 @@ public class PBinary extends PBinaryBase {
 
     @Override
     public Object toObject(String value) {
-        if (value == null || value.length() == 0) {
-            return null;
-        }
-        return Base64.decode(value);
+        return PVarbinary.INSTANCE.toObject(value);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a6752119/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PVarbinary.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PVarbinary.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PVarbinary.java
index d96650d..b3ce57a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PVarbinary.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PVarbinary.java
@@ -131,7 +131,10 @@ public class PVarbinary extends PBinaryBase {
         if (value == null || value.length() == 0) {
             return null;
         }
-        return Base64.decode(value);
+        Object object = Base64.decode(value);
+        if (object == null) { throw newIllegalDataException(
+                "Input: [" + value + "]  is not base64 encoded"); }
+        return object;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a6752119/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
index dbac76f..3c16d00 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
@@ -213,6 +213,9 @@ public class PhoenixRuntime {
             if (execCmd.isLocalIndexUpgrade()) {
                 props.setProperty(QueryServices.LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB, "false");
             }
+            if (execCmd.binaryEncoding != null) {
+                props.setProperty(QueryServices.UPLOAD_BINARY_DATA_TYPE_ENCODING, execCmd.binaryEncoding);
+            }
             conn = DriverManager.getConnection(jdbcUrl, props).unwrap(PhoenixConnection.class);
             conn.setRunningUpgrade(true);
             if (execCmd.isMapNamespace()) {
@@ -532,6 +535,7 @@ public class PhoenixRuntime {
         private boolean mapNamespace;
         private String srcTable;
         private boolean localIndexUpgrade;
+        private String binaryEncoding;
 
         /**
          * Factory method to build up an {@code ExecutionCommand} based on supplied parameters.
@@ -539,6 +543,8 @@ public class PhoenixRuntime {
         public static ExecutionCommand parseArgs(String[] args) {
             Option tableOption = new Option("t", "table", true,
                     "Overrides the table into which the CSV data is loaded and is case sensitive");
+            Option binaryEncodingOption = new Option("b", "binaryEncoding", true,
+                    "Specifies binary encoding");
             Option headerOption = new Option("h", "header", true, "Overrides the column names
to" +
                     " which the CSV data maps and is case sensitive. A special value of "
+
                     "in-line indicating that the first line of the CSV file determines the
" +
@@ -588,6 +594,7 @@ public class PhoenixRuntime {
             options.addOption(bypassUpgradeOption);
             options.addOption(mapNamespaceOption);
             options.addOption(localIndexUpgradeOption);
+            options.addOption(binaryEncodingOption);
 
             CommandLineParser parser = new PosixParser();
             CommandLine cmdLine = null;
@@ -606,6 +613,10 @@ public class PhoenixRuntime {
             if (cmdLine.hasOption(tableOption.getOpt())) {
                 execCmd.tableName = cmdLine.getOptionValue(tableOption.getOpt());
             }
+            
+            if (cmdLine.hasOption(binaryEncodingOption.getOpt())) {
+                execCmd.binaryEncoding = cmdLine.getOptionValue(binaryEncodingOption.getOpt());
+            }
 
             if (cmdLine.hasOption(headerOption.getOpt())) {
                 String columnString = cmdLine.getOptionValue(headerOption.getOpt());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a6752119/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java
b/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java
index 0d3e17d..cd40b44 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java
@@ -27,12 +27,18 @@ import java.util.Properties;
 import javax.annotation.Nullable;
 
 import org.apache.commons.csv.CSVRecord;
+import org.apache.hadoop.hbase.util.Base64;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.expression.function.EncodeFormat;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.IllegalDataException;
+import org.apache.phoenix.schema.types.PBinary;
 import org.apache.phoenix.schema.types.PBoolean;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PDataType.PDataCodec;
 import org.apache.phoenix.schema.types.PTimestamp;
+import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.util.ColumnInfo;
 import org.apache.phoenix.util.DateUtil;
 import org.apache.phoenix.util.UpsertExecutor;
@@ -116,6 +122,7 @@ public class CsvUpsertExecutor extends UpsertExecutor<CSVRecord, String>
{
         private final PDataType dataType;
         private final PDataCodec codec;
         private final DateUtil.DateTimeParser dateTimeParser;
+        private final String binaryEncoding;
 
         SimpleDatatypeConversionFunction(PDataType dataType, Connection conn) {
             Properties props;
@@ -148,6 +155,8 @@ public class CsvUpsertExecutor extends UpsertExecutor<CSVRecord, String>
{
                 this.dateTimeParser = null;
             }
             this.codec = codec;
+            this.binaryEncoding = props.getProperty(QueryServices.UPLOAD_BINARY_DATA_TYPE_ENCODING,
+                            QueryServicesOptions.DEFAULT_UPLOAD_BINARY_DATA_TYPE_ENCODING);
         }
 
         @Nullable
@@ -175,6 +184,22 @@ public class CsvUpsertExecutor extends UpsertExecutor<CSVRecord, String>
{
                         throw new RuntimeException("Invalid boolean value: '" + input
                                 + "', must be one of ['true','t','1','false','f','0']");
                 }
+            }else if (dataType == PVarbinary.INSTANCE || dataType == PBinary.INSTANCE){
+                EncodeFormat format = EncodeFormat.valueOf(binaryEncoding.toUpperCase());
+                Object object = null;
+                switch (format) {
+                    case BASE64:
+                        object = Base64.decode(input);
+                        if (object == null) { throw new IllegalDataException(
+                                "Input: [" + input + "]  is not base64 encoded"); }
+                        break;
+                    case ASCII:
+                        object = Bytes.toBytes(input);
+                        break;
+                    default:
+                        throw new IllegalDataException("Unsupported encoding \"" + binaryEncoding
+ "\"");
+                }
+                return object;
             }
             return dataType.toObject(input);
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a6752119/phoenix-core/src/main/java/org/apache/phoenix/util/json/JsonUpsertExecutor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/json/JsonUpsertExecutor.java
b/phoenix-core/src/main/java/org/apache/phoenix/util/json/JsonUpsertExecutor.java
index bbe0e30..ffa797d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/json/JsonUpsertExecutor.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/json/JsonUpsertExecutor.java
@@ -25,12 +25,20 @@ import java.sql.Types;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+
 import javax.annotation.Nullable;
 
+import org.apache.hadoop.hbase.util.Base64;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.expression.function.EncodeFormat;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.IllegalDataException;
+import org.apache.phoenix.schema.types.PBinary;
+import org.apache.phoenix.schema.types.PBoolean;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PTimestamp;
+import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.util.ColumnInfo;
 import org.apache.phoenix.util.DateUtil;
 import org.apache.phoenix.util.UpsertExecutor;
@@ -137,6 +145,7 @@ public class JsonUpsertExecutor extends UpsertExecutor<Map<?, ?>,
Object> {
 
         private final PDataType dataType;
         private final DateUtil.DateTimeParser dateTimeParser;
+        private final String binaryEncoding;
 
         SimpleDatatypeConversionFunction(PDataType dataType, Connection conn) {
             Properties props;
@@ -166,6 +175,8 @@ public class JsonUpsertExecutor extends UpsertExecutor<Map<?, ?>,
Object> {
             } else {
                 this.dateTimeParser = null;
             }
+            this.binaryEncoding = props.getProperty(QueryServices.UPLOAD_BINARY_DATA_TYPE_ENCODING,
+                    QueryServicesOptions.DEFAULT_UPLOAD_BINARY_DATA_TYPE_ENCODING);
         }
 
         @Nullable
@@ -180,7 +191,40 @@ public class JsonUpsertExecutor extends UpsertExecutor<Map<?, ?>,
Object> {
                 byte[] byteValue = new byte[dataType.getByteSize()];
                 dataType.getCodec().encodeLong(epochTime, byteValue, 0);
                 return dataType.toObject(byteValue);
+            }else if (dataType == PBoolean.INSTANCE) {
+                switch (input.toString()) {
+                case "true":
+                case "t":
+                case "T":
+                case "1":
+                    return Boolean.TRUE;
+                case "false":
+                case "f":
+                case "F":
+                case "0":
+                    return Boolean.FALSE;
+                default:
+                    throw new RuntimeException("Invalid boolean value: '" + input
+                            + "', must be one of ['true','t','1','false','f','0']");
+            }
+        }else if (dataType == PVarbinary.INSTANCE || dataType == PBinary.INSTANCE){
+            EncodeFormat format = EncodeFormat.valueOf(binaryEncoding.toUpperCase());
+            Object object = null;
+            switch (format) {
+                case BASE64:
+                    object = Base64.decode(input.toString());
+                    if (object == null) { throw new IllegalDataException(
+                            "Input: [" + input + "]  is not base64 encoded"); }
+                    break;
+                case ASCII:
+                    object = Bytes.toBytes(input.toString());
+                    break;
+                default:
+                    throw new IllegalDataException("Unsupported encoding \"" + binaryEncoding
+ "\"");
             }
+            return object;
+        }
+            
             return dataType.toObject(input, dataType);
         }
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a6752119/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java
index 3c6271a..33c72a8 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java
@@ -17,6 +17,9 @@
  */
 package org.apache.phoenix.mapreduce;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
@@ -28,9 +31,6 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
 public class CsvBulkImportUtilTest {
 
     @Test
@@ -41,7 +41,7 @@ public class CsvBulkImportUtilTest {
         char quote = '\002';
         char escape = '!';
 
-        CsvBulkImportUtil.initCsvImportJob(conf, delimiter, quote, escape, null);
+        CsvBulkImportUtil.initCsvImportJob(conf, delimiter, quote, escape, null, null);
 
         // Serialize and deserialize the config to ensure that there aren't any issues
         // with non-printable characters as delimiters

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a6752119/phoenix-core/src/test/java/org/apache/phoenix/util/AbstractUpsertExecutorTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/AbstractUpsertExecutorTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/util/AbstractUpsertExecutorTest.java
index 61b03fb..2b2544d 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/AbstractUpsertExecutorTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/AbstractUpsertExecutorTest.java
@@ -17,6 +17,12 @@
  */
 package org.apache.phoenix.util;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -25,9 +31,14 @@ import java.sql.SQLException;
 import java.sql.Types;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Properties;
 
+import org.apache.hadoop.hbase.util.Base64;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.query.BaseConnectionlessQueryTest;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.types.PArrayDataType;
+import org.apache.phoenix.schema.types.PBinary;
 import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.schema.types.PIntegerArray;
 import org.junit.After;
@@ -36,12 +47,6 @@ import org.junit.Test;
 
 import com.google.common.collect.ImmutableList;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-
 public abstract class AbstractUpsertExecutorTest<R, F> extends BaseConnectionlessQueryTest
{
 
     protected Connection conn;
@@ -51,6 +56,7 @@ public abstract class AbstractUpsertExecutorTest<R, F> extends BaseConnectionles
 
     protected abstract UpsertExecutor<R, F> getUpsertExecutor();
     protected abstract R createRecord(Object... columnValues) throws IOException;
+    protected abstract UpsertExecutor<R, F> getUpsertExecutor(Connection conn);
 
     @Before
     public void setUp() throws SQLException {
@@ -59,7 +65,8 @@ public abstract class AbstractUpsertExecutorTest<R, F> extends BaseConnectionles
                 new ColumnInfo("NAME", Types.VARCHAR),
                 new ColumnInfo("AGE", Types.INTEGER),
                 new ColumnInfo("VALUES", PIntegerArray.INSTANCE.getSqlType()),
-                new ColumnInfo("BEARD", Types.BOOLEAN));
+                new ColumnInfo("BEARD", Types.BOOLEAN),
+                new ColumnInfo("PIC", Types.BINARY));
 
         preparedStatement = mock(PreparedStatement.class);
         upsertListener = mock(UpsertExecutor.UpsertListener.class);
@@ -73,8 +80,10 @@ public abstract class AbstractUpsertExecutorTest<R, F> extends BaseConnectionles
 
     @Test
     public void testExecute() throws Exception {
+        byte[] binaryData=(byte[])PBinary.INSTANCE.getSampleValue();
+        String encodedBinaryData = Base64.encodeBytes(binaryData);
         getUpsertExecutor().execute(createRecord(123L, "NameValue", 42,
-                Arrays.asList(1, 2, 3), true));
+                Arrays.asList(1, 2, 3), true, encodedBinaryData));
 
         verify(upsertListener).upsertDone(1L);
         verifyNoMoreInteractions(upsertListener);
@@ -84,6 +93,7 @@ public abstract class AbstractUpsertExecutorTest<R, F> extends BaseConnectionles
         verify(preparedStatement).setObject(3, Integer.valueOf(42));
         verify(preparedStatement).setObject(4, PArrayDataType.instantiatePhoenixArray(PInteger.INSTANCE,
new Object[]{1,2,3}));
         verify(preparedStatement).setObject(5, Boolean.TRUE);
+        verify(preparedStatement).setObject(6, binaryData);
         verify(preparedStatement).execute();
         verifyNoMoreInteractions(preparedStatement);
     }
@@ -99,8 +109,10 @@ public abstract class AbstractUpsertExecutorTest<R, F> extends BaseConnectionles
 
     @Test
     public void testExecute_TooManyFields() throws Exception {
+        byte[] binaryData=(byte[])PBinary.INSTANCE.getSampleValue();
+        String encodedBinaryData = Base64.encodeBytes(binaryData);
         R recordWithTooManyFields = createRecord(123L, "NameValue", 42, Arrays.asList(1,
2, 3),
-                true, "Garbage");
+                true, encodedBinaryData, "garbage");
         getUpsertExecutor().execute(recordWithTooManyFields);
 
         verify(upsertListener).upsertDone(1L);
@@ -111,14 +123,17 @@ public abstract class AbstractUpsertExecutorTest<R, F> extends
BaseConnectionles
         verify(preparedStatement).setObject(3, Integer.valueOf(42));
         verify(preparedStatement).setObject(4, PArrayDataType.instantiatePhoenixArray(PInteger.INSTANCE,
new Object[]{1,2,3}));
         verify(preparedStatement).setObject(5, Boolean.TRUE);
+        verify(preparedStatement).setObject(6, binaryData);
         verify(preparedStatement).execute();
         verifyNoMoreInteractions(preparedStatement);
     }
 
     @Test
     public void testExecute_NullField() throws Exception {
+        byte[] binaryData=(byte[])PBinary.INSTANCE.getSampleValue();
+        String encodedBinaryData = Base64.encodeBytes(binaryData);
         getUpsertExecutor().execute(createRecord(123L, "NameValue", null,
-                Arrays.asList(1, 2, 3), false));
+                Arrays.asList(1, 2, 3), false, encodedBinaryData));
 
         verify(upsertListener).upsertDone(1L);
         verifyNoMoreInteractions(upsertListener);
@@ -128,17 +143,62 @@ public abstract class AbstractUpsertExecutorTest<R, F> extends
BaseConnectionles
         verify(preparedStatement).setNull(3, columnInfoList.get(2).getSqlType());
         verify(preparedStatement).setObject(4, PArrayDataType.instantiatePhoenixArray(PInteger.INSTANCE,
new Object[]{1,2,3}));
         verify(preparedStatement).setObject(5, Boolean.FALSE);
+        verify(preparedStatement).setObject(6, binaryData);
         verify(preparedStatement).execute();
         verifyNoMoreInteractions(preparedStatement);
     }
 
     @Test
     public void testExecute_InvalidType() throws Exception {
+        byte[] binaryData=(byte[])PBinary.INSTANCE.getSampleValue();
+        String encodedBinaryData = Base64.encodeBytes(binaryData);
         R recordWithInvalidType = createRecord(123L, "NameValue", "ThisIsNotANumber",
-                Arrays.asList(1, 2, 3), true);
+                Arrays.asList(1, 2, 3), true, encodedBinaryData);
         getUpsertExecutor().execute(recordWithInvalidType);
 
         verify(upsertListener).errorOnRecord(eq(recordWithInvalidType), any(Throwable.class));
         verifyNoMoreInteractions(upsertListener);
     }
+    
+    @Test
+    public void testExecute_InvalidBoolean() throws Exception {
+        byte[] binaryData=(byte[])PBinary.INSTANCE.getSampleValue();
+        String encodedBinaryData = Base64.encodeBytes(binaryData);
+        R csvRecordWithInvalidType = createRecord("123,NameValue,42,1:2:3,NotABoolean,"+encodedBinaryData);
+        getUpsertExecutor().execute(csvRecordWithInvalidType);
+
+        verify(upsertListener).errorOnRecord(eq(csvRecordWithInvalidType), any(Throwable.class));
+    }
+    
+    @Test
+    public void testExecute_InvalidBinary() throws Exception {
+        String notBase64Encoded="#@$df";
+        R csvRecordWithInvalidType = createRecord("123,NameValue,42,1:2:3,true,"+notBase64Encoded);
+        getUpsertExecutor().execute(csvRecordWithInvalidType);
+
+        verify(upsertListener).errorOnRecord(eq(csvRecordWithInvalidType), any(Throwable.class));
+    }
+    
+    @Test
+    public void testExecute_AsciiEncoded() throws Exception {
+        String asciiValue="#@$df";
+        Properties info=new Properties();
+        info.setProperty(QueryServices.UPLOAD_BINARY_DATA_TYPE_ENCODING,"ASCII");
+        getUpsertExecutor(DriverManager.getConnection(getUrl(),info)).execute(createRecord(123L,
"NameValue", 42,
+                Arrays.asList(1, 2, 3), true, asciiValue));
+
+        verify(upsertListener).upsertDone(1L);
+        verifyNoMoreInteractions(upsertListener);
+        
+        verify(preparedStatement).setObject(1, Long.valueOf(123L));
+        verify(preparedStatement).setObject(2, "NameValue");
+        verify(preparedStatement).setObject(3, Integer.valueOf(42));
+        verify(preparedStatement).setObject(4, PArrayDataType.instantiatePhoenixArray(PInteger.INSTANCE,
new Object[]{1,2,3}));
+        verify(preparedStatement).setObject(5, Boolean.TRUE);
+        verify(preparedStatement).setObject(6, Bytes.toBytes(asciiValue));
+        verify(preparedStatement).execute();
+        verifyNoMoreInteractions(preparedStatement);
+    }
+
+    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a6752119/phoenix-core/src/test/java/org/apache/phoenix/util/csv/CsvUpsertExecutorTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/csv/CsvUpsertExecutorTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/util/csv/CsvUpsertExecutorTest.java
index c887ff7..a5ec4fa 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/csv/CsvUpsertExecutorTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/csv/CsvUpsertExecutorTest.java
@@ -18,23 +18,19 @@
 package org.apache.phoenix.util.csv;
 
 import java.io.IOException;
+import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.List;
 
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
 import org.apache.commons.csv.CSVFormat;
 import org.apache.commons.csv.CSVParser;
 import org.apache.commons.csv.CSVRecord;
 import org.apache.phoenix.util.AbstractUpsertExecutorTest;
 import org.apache.phoenix.util.UpsertExecutor;
 import org.junit.Before;
-import org.junit.Test;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.verify;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
 
 public class CsvUpsertExecutorTest extends AbstractUpsertExecutorTest<CSVRecord, String>
{
 
@@ -46,7 +42,13 @@ public class CsvUpsertExecutorTest extends AbstractUpsertExecutorTest<CSVRecord,
     public UpsertExecutor<CSVRecord, String> getUpsertExecutor() {
         return upsertExecutor;
     }
-
+    
+    @Override
+    public UpsertExecutor<CSVRecord, String> getUpsertExecutor(Connection conn) {
+        return new CsvUpsertExecutor(conn, columnInfoList, preparedStatement,
+                upsertListener, ARRAY_SEP);
+    }
+    
     @Override
     public CSVRecord createRecord(Object... columnValues) throws IOException {
         for (int i = 0; i < columnValues.length; i++) {
@@ -69,11 +71,5 @@ public class CsvUpsertExecutorTest extends AbstractUpsertExecutorTest<CSVRecord,
                 upsertListener, ARRAY_SEP);
     }
 
-    @Test
-    public void testExecute_InvalidBoolean() throws Exception {
-        CSVRecord csvRecordWithInvalidType = createRecord("123,NameValue,42,1:2:3,NotABoolean");
-        upsertExecutor.execute(ImmutableList.of(csvRecordWithInvalidType));
-
-        verify(upsertListener).errorOnRecord(eq(csvRecordWithInvalidType), any(Throwable.class));
-    }
+    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a6752119/phoenix-core/src/test/java/org/apache/phoenix/util/json/JsonUpsertExecutorTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/json/JsonUpsertExecutorTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/util/json/JsonUpsertExecutorTest.java
index c042dd4..6ac9cf9 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/json/JsonUpsertExecutorTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/json/JsonUpsertExecutorTest.java
@@ -18,6 +18,7 @@
 package org.apache.phoenix.util.json;
 
 import java.io.IOException;
+import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.HashMap;
 import java.util.Map;
@@ -50,4 +51,9 @@ public class JsonUpsertExecutorTest extends AbstractUpsertExecutorTest<Map<?,
?>
         super.setUp();
         upsertExecutor = new JsonUpsertExecutor(conn, columnInfoList, preparedStatement,
upsertListener);
     }
+
+    @Override
+    protected UpsertExecutor<Map<?, ?>, Object> getUpsertExecutor(Connection
conn) {
+        return new JsonUpsertExecutor(conn, columnInfoList, preparedStatement, upsertListener);
+    }
 }


Mime
View raw message