phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamestay...@apache.org
Subject [4/5] PHOENIX-66 Support array creation from CSV file (GabrielReid)
Date Wed, 12 Mar 2014 00:24:58 GMT
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/f15c7f1e/phoenix-core/src/test/java/org/apache/phoenix/end2end/CSVCommonsLoaderTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/end2end/CSVCommonsLoaderTest.java b/phoenix-core/src/test/java/org/apache/phoenix/end2end/CSVCommonsLoaderTest.java
index 5625580..7d4cbff 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/end2end/CSVCommonsLoaderTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/end2end/CSVCommonsLoaderTest.java
@@ -34,6 +34,8 @@ import java.util.Collections;
 import org.apache.commons.csv.CSVParser;
 import org.apache.commons.csv.CSVRecord;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.PArrayDataType;
+import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.util.CSVCommonsLoader;
 import org.apache.phoenix.util.DateUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
@@ -41,520 +43,559 @@ import org.junit.Test;
 
 public class CSVCommonsLoaderTest extends BaseHBaseManagedTimeTest {
 
-	private static final String DATATYPE_TABLE = "DATATYPE";
-	private static final String DATATYPES_CSV_VALUES = "CKEY, CVARCHAR, CINTEGER, CDECIMAL, CUNSIGNED_INT, CBOOLEAN, CBIGINT, CUNSIGNED_LONG, CTIME, CDATE\n"
-			+ "KEY1,A,2147483647,1.1,0,TRUE,9223372036854775807,0,1990-12-31 10:59:59,1999-12-31 23:59:59\n"
-			+ "KEY2,B,-2147483648,-1.1,2147483647,FALSE,-9223372036854775808,9223372036854775807,2000-01-01 00:00:01,2012-02-29 23:59:59\n";
-	private static final String STOCK_TABLE = "STOCK_SYMBOL";
-	private static final String STOCK_CSV_VALUES = "AAPL,APPLE Inc.\n"
-			+ "CRM,SALESFORCE\n" + "GOOG,Google\n"
-			+ "HOG,Harlet-Davidson Inc.\n" + "HPQ,Hewlett Packard\n"
-			+ "INTC,Intel\n" + "MSFT,Microsoft\n" + "WAG,Walgreens\n"
-			+ "WMT,Walmart\n";
-	private static final String[] STOCK_COLUMNS_WITH_BOGUS = new String[] {
-			"SYMBOL", "BOGUS" };
-	private static final String[] STOCK_COLUMNS = new String[] { "SYMBOL",
-			"COMPANY" };
-	private static final String STOCK_CSV_VALUES_WITH_HEADER = STOCK_COLUMNS[0]
-			+ "," + STOCK_COLUMNS[1] + "\n" + STOCK_CSV_VALUES;
-	private static final String STOCK_CSV_VALUES_WITH_DELIMITER = "APPL"
-			+ '\u0001' + '\u0002' + "APPLE\n" + " Inc" + '\u0002' + "\n"
-			+ "MSFT" + '\u0001' + "Microsoft\n";
-
-	private static final String STOCK_TDV_VALUES = "AAPL\tAPPLE Inc\n"
-			+ "CRM\tSALESFORCE\n" + "GOOG\tGoogle\n"
-			+ "HOG\tHarlet-Davidson Inc.\n" + "HPQ\tHewlett Packard\n"
-			+ "INTC\tIntel\n" + "MSFT\tMicrosoft\n" + "WAG\tWalgreens\n"
-			+ "WMT\tWalmart\n";
-	private static final String STOCK_TDV_VALUES_WITH_HEADER = STOCK_COLUMNS[0]
-			+ "\t" + STOCK_COLUMNS[1] + "\n" + STOCK_TDV_VALUES;
-
-
-	private static final String ENCAPSULATED_CHARS_TABLE = "ENCAPSULATEDCHAR";
-	private static final String[] ENCAPSULATED_CHARS_COLUMNS = new String[] {
-			"MYKEY", "MYVALUE" };
-	private static final String CSV_VALUES_ENCAPSULATED_CONTROL_CHARS = "ALL THREEF,\"This has a all three , , \"\" \r\n in it. \"\n"
-			+ "COMMA,\"This has a comma , in it. \"\n"
-			+ "CRLF,\"This has a crlf \r\n in it. \"\n"
-			+ "QUOTE,\"This has a quote \"\" in it. \"\n";
-	private static final String CSV_VALUES_ENCAPSULATED_CONTROL_CHARS_WITH_HEADER = ENCAPSULATED_CHARS_COLUMNS[0]
-			+ ","
-			+ ENCAPSULATED_CHARS_COLUMNS[1]
-			+ "\n"
-			+ CSV_VALUES_ENCAPSULATED_CONTROL_CHARS;
-	private static final String CSV_VALUES_BAD_ENCAPSULATED_CONTROL_CHARS = "ALL THREEF,\"This has a all three , , \"\" \r\n in it. \"\n"
-			+ "COMMA,\"This has a comma , in it. \"\n"
-			+ "CRLF,\"This has a crlf \r\n in it. \"\n"
-			+ "BADENCAPSULATEDQUOTE,\"\"This has a bad quote in it. \"\n";
-	private static final String CSV_VALUES_BAD_ENCAPSULATED_CONTROL_CHARS_WITH_HEADER = ENCAPSULATED_CHARS_COLUMNS[0]
-			+ ","
-			+ ENCAPSULATED_CHARS_COLUMNS[1]
-			+ "\n"
-			+ CSV_VALUES_BAD_ENCAPSULATED_CONTROL_CHARS;
-
-	@Test
-	public void testCSVCommonsUpsert() throws Exception {
-		CSVParser parser = null;
-		PhoenixConnection conn = null;
-		try {
-
-			// Create table
-			String statements = "CREATE TABLE IF NOT EXISTS " + STOCK_TABLE
-					+ "(SYMBOL VARCHAR NOT NULL PRIMARY KEY, COMPANY VARCHAR);";
-			conn = DriverManager.getConnection(getUrl()).unwrap(
-					PhoenixConnection.class);
-			PhoenixRuntime.executeStatements(conn,
-					new StringReader(statements), null);
-
-			// Upsert CSV file
-			CSVCommonsLoader csvUtil = new CSVCommonsLoader(conn, STOCK_TABLE,
-					Collections.<String> emptyList(), true);
-			csvUtil.upsert(new StringReader(STOCK_CSV_VALUES_WITH_HEADER));
-
-			// Compare Phoenix ResultSet with CSV file content
-			PreparedStatement statement = conn
-					.prepareStatement("SELECT SYMBOL, COMPANY FROM "
-							+ STOCK_TABLE);
-			ResultSet phoenixResultSet = statement.executeQuery();
-			parser = new CSVParser(new StringReader(
-					STOCK_CSV_VALUES_WITH_HEADER), csvUtil.getFormat());
-			for (CSVRecord record : parser) {
-				assertTrue(phoenixResultSet.next());
-				int i = 0;
-				for (String value : record) {
-					assertEquals(value, phoenixResultSet.getString(i + 1));
-					i++;
-				}
-			}
-			assertFalse(phoenixResultSet.next());
-		} finally {
-			if (parser != null)
-				parser.close();
-			if (conn != null)
-				conn.close();
-		}
-	}
-
-	@Test
-	public void testTDVCommonsUpsert() throws Exception {
-		CSVParser parser = null;
-		PhoenixConnection conn = null;
-		try {
-
-			// Create table
-			String statements = "CREATE TABLE IF NOT EXISTS " + STOCK_TABLE
-					+ "(SYMBOL VARCHAR NOT NULL PRIMARY KEY, COMPANY VARCHAR);";
-			conn = DriverManager.getConnection(getUrl()).unwrap(
-					PhoenixConnection.class);
-			PhoenixRuntime.executeStatements(conn,
-					new StringReader(statements), null);
-
-			// Upsert TDV file
-			CSVCommonsLoader csvUtil = new CSVCommonsLoader(conn, STOCK_TABLE,Collections.<String> emptyList()
-					, true,Arrays.asList("\t", "0", "0"));
-			csvUtil.upsert(new StringReader(STOCK_TDV_VALUES_WITH_HEADER));
-
-			// Compare Phoenix ResultSet with CSV file content
-			PreparedStatement statement = conn
-					.prepareStatement("SELECT SYMBOL, COMPANY FROM "
-							+ STOCK_TABLE);
-			ResultSet phoenixResultSet = statement.executeQuery();
-			parser = new CSVParser(new StringReader(
-					STOCK_TDV_VALUES_WITH_HEADER), csvUtil.getFormat());
-			for (CSVRecord record : parser) {
-				assertTrue(phoenixResultSet.next());
-				int i = 0;
-				for (String value : record) {
-					assertEquals(value, phoenixResultSet.getString(i + 1));
-					i++;
-				}
-			}
-			assertFalse(phoenixResultSet.next());
-		} finally {
-			if (parser != null)
-				parser.close();
-			if (conn != null)
-				conn.close();
-		}
-	}
-
-	@Test
-	public void testCSVUpsertWithCustomDelimiters() throws Exception {
-		CSVParser parser = null;
-		PhoenixConnection conn = null;
-		try {
-			// Create table
-			String statements = "CREATE TABLE IF NOT EXISTS " + STOCK_TABLE
-					+ "(SYMBOL VARCHAR NOT NULL PRIMARY KEY, COMPANY VARCHAR);";
-			conn = DriverManager.getConnection(getUrl()).unwrap(
-					PhoenixConnection.class);
-			PhoenixRuntime.executeStatements(conn,
-					new StringReader(statements), null);
-
-			// Upsert CSV file
-			CSVCommonsLoader csvUtil = new CSVCommonsLoader(conn, STOCK_TABLE,
-					Arrays.<String> asList(STOCK_COLUMNS), true, Arrays.asList(
-							"1", "2", "3"));
-			csvUtil.upsert(new StringReader(STOCK_CSV_VALUES_WITH_DELIMITER));
-
-			// Compare Phoenix ResultSet with CSV file content
-			PreparedStatement statement = conn
-					.prepareStatement("SELECT SYMBOL, COMPANY FROM "
-							+ STOCK_TABLE);
-			ResultSet phoenixResultSet = statement.executeQuery();
-			parser = new CSVParser(new StringReader(
-					STOCK_CSV_VALUES_WITH_DELIMITER), csvUtil.getFormat());
-			for (CSVRecord record : parser) {
-				assertTrue(phoenixResultSet.next());
-				int i = 0;
-				for (String value : record) {
-					assertEquals(value, phoenixResultSet.getString(i + 1));
-					i++;
-				}
-			}
-			assertFalse(phoenixResultSet.next());
-		} finally {
-			if (parser != null)
-				parser.close();
-			if (conn != null)
-				conn.close();
-		}
-	}
-
-	@Test
-	public void testCSVUpsertWithColumns() throws Exception {
-		CSVParser parser = null;
-		PhoenixConnection conn = null;
-		try {
-			// Create table
-			String statements = "CREATE TABLE IF NOT EXISTS " + STOCK_TABLE
-					+ "(SYMBOL VARCHAR NOT NULL PRIMARY KEY, COMPANY VARCHAR);";
-			conn = DriverManager.getConnection(getUrl())
-					.unwrap(PhoenixConnection.class);
-			PhoenixRuntime.executeStatements(conn,
-					new StringReader(statements), null);
-
-			// Upsert CSV file
-			CSVCommonsLoader csvUtil = new CSVCommonsLoader(conn, STOCK_TABLE,
-					Arrays.<String> asList(STOCK_COLUMNS), true);
-			// no header
-			csvUtil.upsert(new StringReader(STOCK_CSV_VALUES));
-
-			// Compare Phoenix ResultSet with CSV file content
-			PreparedStatement statement = conn
-					.prepareStatement("SELECT SYMBOL, COMPANY FROM "
-							+ STOCK_TABLE);
-			ResultSet phoenixResultSet = statement.executeQuery();
-			parser = new CSVParser(new StringReader(
-					STOCK_CSV_VALUES), csvUtil.getFormat());
-			for (CSVRecord record : parser) {
-				assertTrue(phoenixResultSet.next());
-				int i = 0;
-				for (String value : record) {
-					assertEquals(value, phoenixResultSet.getString(i + 1));
-					i++;
-				}
-			}
-
-			assertFalse(phoenixResultSet.next());
-		} finally {
-			if (parser != null)
-				parser.close();
-			if (conn != null)
-				conn.close();
-		}
-	}
-
-	@Test
-	public void testCSVUpsertWithNoColumns() throws Exception {
-		CSVParser parser = null;
-		PhoenixConnection conn = null;
-		try {
-			// Create table
-			String statements = "CREATE TABLE IF NOT EXISTS " + STOCK_TABLE
-					+ "(SYMBOL VARCHAR NOT NULL PRIMARY KEY, COMPANY VARCHAR);";
-			 conn = DriverManager.getConnection(getUrl())
-					.unwrap(PhoenixConnection.class);
-			PhoenixRuntime.executeStatements(conn,
-					new StringReader(statements), null);
-
-			// Upsert CSV file
-			CSVCommonsLoader csvUtil = new CSVCommonsLoader(conn, STOCK_TABLE,
-					null, true);
-			csvUtil.upsert(new StringReader(STOCK_CSV_VALUES));
-
-			// Compare Phoenix ResultSet with CSV file content
-			PreparedStatement statement = conn
-					.prepareStatement("SELECT SYMBOL, COMPANY FROM "
-							+ STOCK_TABLE);
-			ResultSet phoenixResultSet = statement.executeQuery();
-			parser = new CSVParser(new StringReader(
-					STOCK_CSV_VALUES), csvUtil.getFormat());
-			for (CSVRecord record : parser) {
-				assertTrue(phoenixResultSet.next());
-				int i = 0;
-				for (String value : record) {
-					assertEquals(value, phoenixResultSet.getString(i + 1));
-					i++;
-				}
-			}
-
-			assertFalse(phoenixResultSet.next());
-		} finally {
-			if (parser != null)
-				parser.close();
-			if (conn != null)
-				conn.close();
-		}
-	}
-
-	@Test
-	public void testCSVUpsertWithBogusColumn() throws Exception {
-		CSVParser parser = null;
-		PhoenixConnection conn = null;
-		try {
-			// Create table
-			String statements = "CREATE TABLE IF NOT EXISTS " + STOCK_TABLE
-					+ "(SYMBOL VARCHAR NOT NULL PRIMARY KEY, COMPANY VARCHAR);";
-			conn = DriverManager.getConnection(getUrl())
-					.unwrap(PhoenixConnection.class);
-			PhoenixRuntime.executeStatements(conn,
-					new StringReader(statements), null);
-
-			// Upsert CSV file, not strict
-			CSVCommonsLoader csvUtil = new CSVCommonsLoader(conn, STOCK_TABLE,
-					Arrays.asList(STOCK_COLUMNS_WITH_BOGUS), false);
-			csvUtil.upsert(new StringReader(STOCK_CSV_VALUES));
-
-			// Compare Phoenix ResultSet with CSV file content
-			PreparedStatement statement = conn
-					.prepareStatement("SELECT SYMBOL, COMPANY FROM "
-							+ STOCK_TABLE);
-			ResultSet phoenixResultSet = statement.executeQuery();
-			parser = new CSVParser(new StringReader(STOCK_CSV_VALUES),
-					csvUtil.getFormat());
-			for (CSVRecord record : parser) {
-				assertTrue(phoenixResultSet.next());
-				assertEquals(record.get(0), phoenixResultSet.getString(1));
-				assertNull(phoenixResultSet.getString(2));
-			}
-
-			assertFalse(phoenixResultSet.next());
-		} finally {
-			if (parser != null)
-				parser.close();
-			if (conn != null)
-				conn.close();
-		}
-	}
-
-	@Test
-	public void testCSVUpsertWithAllColumn() throws Exception {
-		CSVParser parser = null;
-		PhoenixConnection conn = null;
-		try {
-			// Create table
-			String statements = "CREATE TABLE IF NOT EXISTS " + STOCK_TABLE
-					+ "(SYMBOL VARCHAR NOT NULL PRIMARY KEY, COMPANY VARCHAR);";
-			conn = DriverManager.getConnection(getUrl())
-					.unwrap(PhoenixConnection.class);
-			PhoenixRuntime.executeStatements(conn,
-					new StringReader(statements), null);
-
-			// Upsert CSV file
-			CSVCommonsLoader csvUtil = new CSVCommonsLoader(conn, STOCK_TABLE,
-					Arrays.asList("FOO", "BAR"), false);
-
-			try {
-				csvUtil.upsert(new StringReader(STOCK_CSV_VALUES));
-				fail();
-			} catch (SQLException e) {
-				assertTrue(
-						e.getMessage(),
-						e.getMessage()
-								.contains(
-										"ERROR 504 (42703): Undefined column. columnName=STOCK_SYMBOL.[FOO, BAR]"));
-			}
-		} finally {
-			if (parser != null)
-				parser.close();
-			if (conn != null)
-				conn.close();
-		}
-	}
-
-	@Test
-	public void testCSVUpsertWithBogusColumnStrict() throws Exception {
-		CSVParser parser = null;
-		PhoenixConnection conn = null;
-		try {
-			// Create table
-			String statements = "CREATE TABLE IF NOT EXISTS " + STOCK_TABLE
-					+ "(SYMBOL VARCHAR NOT NULL PRIMARY KEY, COMPANY VARCHAR);";
-			conn = DriverManager.getConnection(getUrl())
-					.unwrap(PhoenixConnection.class);
-			PhoenixRuntime.executeStatements(conn,
-					new StringReader(statements), null);
-
-			// Upsert CSV file
-			CSVCommonsLoader csvUtil = new CSVCommonsLoader(conn, STOCK_TABLE,
-					Arrays.asList(STOCK_COLUMNS_WITH_BOGUS), true);
-			try {
-				csvUtil.upsert(new StringReader(STOCK_CSV_VALUES));
-				fail();
-			} catch (SQLException e) {
-				assertTrue(
-						e.getMessage(),
-						e.getMessage()
-								.contains(
-										"ERROR 504 (42703): Undefined column. columnName=STOCK_SYMBOL.BOGUS"));
-			}
-		} finally {
-			if (parser != null)
-				parser.close();
-			if (conn != null)
-				conn.close();
-		}
-	}
-
-	@Test
-	public void testAllDatatypes() throws Exception {
-		CSVParser parser = null;
-		PhoenixConnection conn = null;
-		try {
-			// Create table
-			String statements = "CREATE TABLE IF NOT EXISTS "
-					+ DATATYPE_TABLE
-					+ " (CKEY VARCHAR NOT NULL PRIMARY KEY,"
-					+ "  CVARCHAR VARCHAR, CINTEGER INTEGER, CDECIMAL DECIMAL(31,10), CUNSIGNED_INT UNSIGNED_INT, CBOOLEAN BOOLEAN, CBIGINT BIGINT, CUNSIGNED_LONG UNSIGNED_LONG, CTIME TIME, CDATE DATE);";
-			conn = DriverManager.getConnection(getUrl())
-					.unwrap(PhoenixConnection.class);
-			PhoenixRuntime.executeStatements(conn,
-					new StringReader(statements), null);
-
-			// Upsert CSV file
-			CSVCommonsLoader csvUtil = new CSVCommonsLoader(conn,
-					DATATYPE_TABLE, Collections.<String> emptyList(), true);
-			csvUtil.upsert(new StringReader(DATATYPES_CSV_VALUES));
-
-			// Compare Phoenix ResultSet with CSV file content
-			PreparedStatement statement = conn
-					.prepareStatement("SELECT CKEY, CVARCHAR, CINTEGER, CDECIMAL, CUNSIGNED_INT, CBOOLEAN, CBIGINT, CUNSIGNED_LONG, CTIME, CDATE FROM "
-							+ DATATYPE_TABLE);
-			ResultSet phoenixResultSet = statement.executeQuery();
-			parser = new CSVParser(new StringReader(DATATYPES_CSV_VALUES),
-					csvUtil.getFormat());
-
-			for (CSVRecord record : parser) {
-				assertTrue(phoenixResultSet.next());
-				int i = 0;
-				int size = record.size();
-				for (String value : record) {
-					assertEquals(value, phoenixResultSet.getObject(i + 1)
-							.toString().toUpperCase());
-					if (i < size - 2)
-						break;
-					i++;
-				}
-				// special case for matching date, time values
-				assertEquals(DateUtil.parseTime(record.get(8)),
-						phoenixResultSet.getTime("CTIME"));
-				assertEquals(DateUtil.parseDate(record.get(9)),
-						phoenixResultSet.getDate("CDATE"));
-			}
-
-			assertFalse(phoenixResultSet.next());
-		} finally {
-			if (parser != null)
-				parser.close();
-			if (conn != null)
-				conn.close();
-		}
-	}
-
-	@Test
-	public void testCSVCommonsUpsertEncapsulatedControlChars() throws Exception {
-		CSVParser parser = null;
-		PhoenixConnection conn = null;
-		try {
-			// Create table
-			String statements = "CREATE TABLE IF NOT EXISTS "
-					+ ENCAPSULATED_CHARS_TABLE
-					+ "(MYKEY VARCHAR NOT NULL PRIMARY KEY, MYVALUE VARCHAR);";
-			conn = DriverManager.getConnection(getUrl())
-					.unwrap(PhoenixConnection.class);
-			PhoenixRuntime.executeStatements(conn,
-					new StringReader(statements), null);
-
-			// Upsert CSV file
-			CSVCommonsLoader csvUtil = new CSVCommonsLoader(conn,
-					ENCAPSULATED_CHARS_TABLE, Collections.<String> emptyList(),
-					true);
-			csvUtil.upsert(new StringReader(
-					CSV_VALUES_ENCAPSULATED_CONTROL_CHARS_WITH_HEADER));
-
-			// Compare Phoenix ResultSet with CSV file content
-			PreparedStatement statement = conn
-					.prepareStatement("SELECT MYKEY, MYVALUE FROM "
-							+ ENCAPSULATED_CHARS_TABLE);
-			ResultSet phoenixResultSet = statement.executeQuery();
-			parser = new CSVParser(new StringReader(
-					CSV_VALUES_ENCAPSULATED_CONTROL_CHARS_WITH_HEADER),
-					csvUtil.getFormat());
-			for (CSVRecord record : parser) {
-				assertTrue(phoenixResultSet.next());
-				int i = 0;
-				for (String value : record) {
-					assertEquals(value, phoenixResultSet.getString(i + 1));
-					i++;
-				}
-			}
-
-			assertFalse(phoenixResultSet.next());
-		} finally {
-			if (parser != null)
-				parser.close();
-			if (conn != null)
-				conn.close();
-		}
-	}
-
-	@Test
-	public void testCSVCommonsUpsertBadEncapsulatedControlChars()
-			throws Exception {
-		CSVParser parser = null;
-		PhoenixConnection conn = null;
-		try {
-			// Create table
-			String statements = "CREATE TABLE IF NOT EXISTS "
-					+ ENCAPSULATED_CHARS_TABLE
-					+ "(MYKEY VARCHAR NOT NULL PRIMARY KEY, MYVALUE VARCHAR);";
-			conn = DriverManager.getConnection(getUrl())
-					.unwrap(PhoenixConnection.class);
-			PhoenixRuntime.executeStatements(conn,
-					new StringReader(statements), null);
-
-			// Upsert CSV file
-			CSVCommonsLoader csvUtil = new CSVCommonsLoader(conn,
-					ENCAPSULATED_CHARS_TABLE, Collections.<String> emptyList(),
-					true);
-			try {
-				csvUtil.upsert(new StringReader(
-						CSV_VALUES_BAD_ENCAPSULATED_CONTROL_CHARS_WITH_HEADER));
-				fail();
-			} catch (RuntimeException e) {
-				assertTrue(
-						e.getMessage(),
-						e.getMessage()
-								.contains(
-										"invalid char between encapsulated token and delimiter"));
-			}
-		} finally {
-			if (parser != null)
-				parser.close();
-			if (conn != null)
-				conn.close();
-		}
-	}
+    private static final String DATATYPE_TABLE = "DATATYPE";
+    private static final String DATATYPES_CSV_VALUES = "CKEY, CVARCHAR, CINTEGER, CDECIMAL, CUNSIGNED_INT, CBOOLEAN, CBIGINT, CUNSIGNED_LONG, CTIME, CDATE\n"
+            + "KEY1,A,2147483647,1.1,0,TRUE,9223372036854775807,0,1990-12-31 10:59:59,1999-12-31 23:59:59\n"
+            + "KEY2,B,-2147483648,-1.1,2147483647,FALSE,-9223372036854775808,9223372036854775807,2000-01-01 00:00:01,2012-02-29 23:59:59\n";
+    private static final String STOCK_TABLE = "STOCK_SYMBOL";
+    private static final String STOCK_CSV_VALUES = "AAPL,APPLE Inc.\n"
+            + "CRM,SALESFORCE\n" + "GOOG,Google\n"
+            + "HOG,Harlet-Davidson Inc.\n" + "HPQ,Hewlett Packard\n"
+            + "INTC,Intel\n" + "MSFT,Microsoft\n" + "WAG,Walgreens\n"
+            + "WMT,Walmart\n";
+    private static final String[] STOCK_COLUMNS_WITH_BOGUS = new String[] {
+            "SYMBOL", "BOGUS" };
+    private static final String[] STOCK_COLUMNS = new String[] { "SYMBOL",
+            "COMPANY" };
+    private static final String STOCK_CSV_VALUES_WITH_HEADER = STOCK_COLUMNS[0]
+            + "," + STOCK_COLUMNS[1] + "\n" + STOCK_CSV_VALUES;
+    private static final String STOCK_CSV_VALUES_WITH_DELIMITER = "APPL"
+            + '\u0001' + '\u0002' + "APPLE\n" + " Inc" + '\u0002' + "\n"
+            + "MSFT" + '\u0001' + "Microsoft\n";
+
+    private static final String STOCK_TDV_VALUES = "AAPL\tAPPLE Inc\n"
+            + "CRM\tSALESFORCE\n" + "GOOG\tGoogle\n"
+            + "HOG\tHarlet-Davidson Inc.\n" + "HPQ\tHewlett Packard\n"
+            + "INTC\tIntel\n" + "MSFT\tMicrosoft\n" + "WAG\tWalgreens\n"
+            + "WMT\tWalmart\n";
+    private static final String STOCK_TDV_VALUES_WITH_HEADER = STOCK_COLUMNS[0]
+            + "\t" + STOCK_COLUMNS[1] + "\n" + STOCK_TDV_VALUES;
+
+
+    private static final String ENCAPSULATED_CHARS_TABLE = "ENCAPSULATEDCHAR";
+    private static final String[] ENCAPSULATED_CHARS_COLUMNS = new String[] {
+            "MYKEY", "MYVALUE" };
+    private static final String CSV_VALUES_ENCAPSULATED_CONTROL_CHARS = "ALL THREEF,\"This has a all three , , \"\" \r\n in it. \"\n"
+            + "COMMA,\"This has a comma , in it. \"\n"
+            + "CRLF,\"This has a crlf \r\n in it. \"\n"
+            + "QUOTE,\"This has a quote \"\" in it. \"\n";
+    private static final String CSV_VALUES_ENCAPSULATED_CONTROL_CHARS_WITH_HEADER = ENCAPSULATED_CHARS_COLUMNS[0]
+            + ","
+            + ENCAPSULATED_CHARS_COLUMNS[1]
+            + "\n"
+            + CSV_VALUES_ENCAPSULATED_CONTROL_CHARS;
+    private static final String CSV_VALUES_BAD_ENCAPSULATED_CONTROL_CHARS = "ALL THREEF,\"This has a all three , , \"\" \r\n in it. \"\n"
+            + "COMMA,\"This has a comma , in it. \"\n"
+            + "CRLF,\"This has a crlf \r\n in it. \"\n"
+            + "BADENCAPSULATEDQUOTE,\"\"This has a bad quote in it. \"\n";
+    private static final String CSV_VALUES_BAD_ENCAPSULATED_CONTROL_CHARS_WITH_HEADER = ENCAPSULATED_CHARS_COLUMNS[0]
+            + ","
+            + ENCAPSULATED_CHARS_COLUMNS[1]
+            + "\n"
+            + CSV_VALUES_BAD_ENCAPSULATED_CONTROL_CHARS;
+
+    @Test
+    public void testCSVCommonsUpsert() throws Exception {
+        CSVParser parser = null;
+        PhoenixConnection conn = null;
+        try {
+
+            // Create table
+            String statements = "CREATE TABLE IF NOT EXISTS " + STOCK_TABLE
+                    + "(SYMBOL VARCHAR NOT NULL PRIMARY KEY, COMPANY VARCHAR);";
+            conn = DriverManager.getConnection(getUrl()).unwrap(
+                    PhoenixConnection.class);
+            PhoenixRuntime.executeStatements(conn,
+                    new StringReader(statements), null);
+
+            // Upsert CSV file
+            CSVCommonsLoader csvUtil = new CSVCommonsLoader(conn, STOCK_TABLE,
+                    Collections.<String> emptyList(), true);
+            csvUtil.upsert(new StringReader(STOCK_CSV_VALUES_WITH_HEADER));
+
+            // Compare Phoenix ResultSet with CSV file content
+            PreparedStatement statement = conn
+                    .prepareStatement("SELECT SYMBOL, COMPANY FROM "
+                            + STOCK_TABLE);
+            ResultSet phoenixResultSet = statement.executeQuery();
+            parser = new CSVParser(new StringReader(
+                    STOCK_CSV_VALUES_WITH_HEADER), csvUtil.getFormat());
+            for (CSVRecord record : parser) {
+                assertTrue(phoenixResultSet.next());
+                int i = 0;
+                for (String value : record) {
+                    assertEquals(value, phoenixResultSet.getString(i + 1));
+                    i++;
+                }
+            }
+            assertFalse(phoenixResultSet.next());
+        } finally {
+            if (parser != null)
+                parser.close();
+            if (conn != null)
+                conn.close();
+        }
+    }
+
+    @Test
+    public void testTDVCommonsUpsert() throws Exception {
+        CSVParser parser = null;
+        PhoenixConnection conn = null;
+        try {
+
+            // Create table
+            String statements = "CREATE TABLE IF NOT EXISTS " + STOCK_TABLE
+                    + "(SYMBOL VARCHAR NOT NULL PRIMARY KEY, COMPANY VARCHAR);";
+            conn = DriverManager.getConnection(getUrl()).unwrap(
+                    PhoenixConnection.class);
+            PhoenixRuntime.executeStatements(conn,
+                    new StringReader(statements), null);
+
+            // Upsert TDV file
+            CSVCommonsLoader csvUtil = new CSVCommonsLoader(conn, STOCK_TABLE,Collections.<String> emptyList()
+                    , true,Arrays.asList("\t", "0", "0"), CSVCommonsLoader.DEFAULT_ARRAY_ELEMENT_SEPARATOR);
+            csvUtil.upsert(new StringReader(STOCK_TDV_VALUES_WITH_HEADER));
+
+            // Compare Phoenix ResultSet with CSV file content
+            PreparedStatement statement = conn
+                    .prepareStatement("SELECT SYMBOL, COMPANY FROM "
+                            + STOCK_TABLE);
+            ResultSet phoenixResultSet = statement.executeQuery();
+            parser = new CSVParser(new StringReader(
+                    STOCK_TDV_VALUES_WITH_HEADER), csvUtil.getFormat());
+            for (CSVRecord record : parser) {
+                assertTrue(phoenixResultSet.next());
+                int i = 0;
+                for (String value : record) {
+                    assertEquals(value, phoenixResultSet.getString(i + 1));
+                    i++;
+                }
+            }
+            assertFalse(phoenixResultSet.next());
+        } finally {
+            if (parser != null)
+                parser.close();
+            if (conn != null)
+                conn.close();
+        }
+    }
+
+    @Test
+    public void testCSVUpsertWithCustomDelimiters() throws Exception {
+        CSVParser parser = null;
+        PhoenixConnection conn = null;
+        try {
+            // Create table
+            String statements = "CREATE TABLE IF NOT EXISTS " + STOCK_TABLE
+                    + "(SYMBOL VARCHAR NOT NULL PRIMARY KEY, COMPANY VARCHAR);";
+            conn = DriverManager.getConnection(getUrl()).unwrap(
+                    PhoenixConnection.class);
+            PhoenixRuntime.executeStatements(conn,
+                    new StringReader(statements), null);
+
+            // Upsert CSV file
+            CSVCommonsLoader csvUtil = new CSVCommonsLoader(conn, STOCK_TABLE,
+                    Arrays.<String> asList(STOCK_COLUMNS), true, Arrays.asList(
+                    "1", "2", "3"), CSVCommonsLoader.DEFAULT_ARRAY_ELEMENT_SEPARATOR);
+            csvUtil.upsert(new StringReader(STOCK_CSV_VALUES_WITH_DELIMITER));
+
+            // Compare Phoenix ResultSet with CSV file content
+            PreparedStatement statement = conn
+                    .prepareStatement("SELECT SYMBOL, COMPANY FROM "
+                            + STOCK_TABLE);
+            ResultSet phoenixResultSet = statement.executeQuery();
+            parser = new CSVParser(new StringReader(
+                    STOCK_CSV_VALUES_WITH_DELIMITER), csvUtil.getFormat());
+            for (CSVRecord record : parser) {
+                assertTrue(phoenixResultSet.next());
+                int i = 0;
+                for (String value : record) {
+                    assertEquals(value, phoenixResultSet.getString(i + 1));
+                    i++;
+                }
+            }
+            assertFalse(phoenixResultSet.next());
+        } finally {
+            if (parser != null)
+                parser.close();
+            if (conn != null)
+                conn.close();
+        }
+    }
+
+    @Test
+    public void testCSVUpsertWithColumns() throws Exception {
+        CSVParser parser = null;
+        PhoenixConnection conn = null;
+        try {
+            // Create table
+            String statements = "CREATE TABLE IF NOT EXISTS " + STOCK_TABLE
+                    + "(SYMBOL VARCHAR NOT NULL PRIMARY KEY, COMPANY VARCHAR);";
+            conn = DriverManager.getConnection(getUrl())
+                    .unwrap(PhoenixConnection.class);
+            PhoenixRuntime.executeStatements(conn,
+                    new StringReader(statements), null);
+
+            // Upsert CSV file
+            CSVCommonsLoader csvUtil = new CSVCommonsLoader(conn, STOCK_TABLE,
+                    Arrays.<String> asList(STOCK_COLUMNS), true);
+            // no header
+            csvUtil.upsert(new StringReader(STOCK_CSV_VALUES));
+
+            // Compare Phoenix ResultSet with CSV file content
+            PreparedStatement statement = conn
+                    .prepareStatement("SELECT SYMBOL, COMPANY FROM "
+                            + STOCK_TABLE);
+            ResultSet phoenixResultSet = statement.executeQuery();
+            parser = new CSVParser(new StringReader(
+                    STOCK_CSV_VALUES), csvUtil.getFormat());
+            for (CSVRecord record : parser) {
+                assertTrue(phoenixResultSet.next());
+                int i = 0;
+                for (String value : record) {
+                    assertEquals(value, phoenixResultSet.getString(i + 1));
+                    i++;
+                }
+            }
+
+            assertFalse(phoenixResultSet.next());
+        } finally {
+            if (parser != null)
+                parser.close();
+            if (conn != null)
+                conn.close();
+        }
+    }
+
+    @Test
+    public void testCSVUpsertWithNoColumns() throws Exception {
+        CSVParser parser = null;
+        PhoenixConnection conn = null;
+        try {
+            // Create table
+            String statements = "CREATE TABLE IF NOT EXISTS " + STOCK_TABLE
+                    + "(SYMBOL VARCHAR NOT NULL PRIMARY KEY, COMPANY VARCHAR);";
+            conn = DriverManager.getConnection(getUrl())
+                    .unwrap(PhoenixConnection.class);
+            PhoenixRuntime.executeStatements(conn,
+                    new StringReader(statements), null);
+
+            // Upsert CSV file
+            CSVCommonsLoader csvUtil = new CSVCommonsLoader(conn, STOCK_TABLE,
+                    null, true);
+            csvUtil.upsert(new StringReader(STOCK_CSV_VALUES));
+
+            // Compare Phoenix ResultSet with CSV file content
+            PreparedStatement statement = conn
+                    .prepareStatement("SELECT SYMBOL, COMPANY FROM "
+                            + STOCK_TABLE);
+            ResultSet phoenixResultSet = statement.executeQuery();
+            parser = new CSVParser(new StringReader(
+                    STOCK_CSV_VALUES), csvUtil.getFormat());
+            for (CSVRecord record : parser) {
+                assertTrue(phoenixResultSet.next());
+                int i = 0;
+                for (String value : record) {
+                    assertEquals(value, phoenixResultSet.getString(i + 1));
+                    i++;
+                }
+            }
+
+            assertFalse(phoenixResultSet.next());
+        } finally {
+            if (parser != null)
+                parser.close();
+            if (conn != null)
+                conn.close();
+        }
+    }
+
+    @Test
+    public void testCSVUpsertWithBogusColumn() throws Exception {
+        CSVParser parser = null;
+        PhoenixConnection conn = null;
+        try {
+            // Create table
+            String statements = "CREATE TABLE IF NOT EXISTS " + STOCK_TABLE
+                    + "(SYMBOL VARCHAR NOT NULL PRIMARY KEY, COMPANY VARCHAR);";
+            conn = DriverManager.getConnection(getUrl())
+                    .unwrap(PhoenixConnection.class);
+            PhoenixRuntime.executeStatements(conn,
+                    new StringReader(statements), null);
+
+            // Upsert CSV file, not strict
+            CSVCommonsLoader csvUtil = new CSVCommonsLoader(conn, STOCK_TABLE,
+                    Arrays.asList(STOCK_COLUMNS_WITH_BOGUS), false);
+            csvUtil.upsert(new StringReader(STOCK_CSV_VALUES));
+
+            // Compare Phoenix ResultSet with CSV file content
+            PreparedStatement statement = conn
+                    .prepareStatement("SELECT SYMBOL, COMPANY FROM "
+                            + STOCK_TABLE);
+            ResultSet phoenixResultSet = statement.executeQuery();
+            parser = new CSVParser(new StringReader(STOCK_CSV_VALUES),
+                    csvUtil.getFormat());
+            for (CSVRecord record : parser) {
+                assertTrue(phoenixResultSet.next());
+                assertEquals(record.get(0), phoenixResultSet.getString(1));
+                assertNull(phoenixResultSet.getString(2));
+            }
+
+            assertFalse(phoenixResultSet.next());
+        } finally {
+            if (parser != null)
+                parser.close();
+            if (conn != null)
+                conn.close();
+        }
+    }
+
+    @Test
+    public void testCSVUpsertWithAllColumn() throws Exception {
+        CSVParser parser = null;
+        PhoenixConnection conn = null;
+        try {
+            // Create table
+            String statements = "CREATE TABLE IF NOT EXISTS " + STOCK_TABLE
+                    + "(SYMBOL VARCHAR NOT NULL PRIMARY KEY, COMPANY VARCHAR);";
+            conn = DriverManager.getConnection(getUrl())
+                    .unwrap(PhoenixConnection.class);
+            PhoenixRuntime.executeStatements(conn,
+                    new StringReader(statements), null);
+
+            // Upsert CSV file
+            CSVCommonsLoader csvUtil = new CSVCommonsLoader(conn, STOCK_TABLE,
+                    Arrays.asList("FOO", "BAR"), false);
+
+            try {
+                csvUtil.upsert(new StringReader(STOCK_CSV_VALUES));
+                fail();
+            } catch (SQLException e) {
+                assertTrue(
+                        e.getMessage(),
+                        e.getMessage()
+                                .contains(
+                                        "ERROR 504 (42703): Undefined column. columnName=STOCK_SYMBOL.[FOO, BAR]"));
+            }
+        } finally {
+            if (parser != null)
+                parser.close();
+            if (conn != null)
+                conn.close();
+        }
+    }
+
+    @Test
+    public void testCSVUpsertWithBogusColumnStrict() throws Exception {
+        CSVParser parser = null;
+        PhoenixConnection conn = null;
+        try {
+            // Create table
+            String statements = "CREATE TABLE IF NOT EXISTS " + STOCK_TABLE
+                    + "(SYMBOL VARCHAR NOT NULL PRIMARY KEY, COMPANY VARCHAR);";
+            conn = DriverManager.getConnection(getUrl())
+                    .unwrap(PhoenixConnection.class);
+            PhoenixRuntime.executeStatements(conn,
+                    new StringReader(statements), null);
+
+            // Upsert CSV file
+            CSVCommonsLoader csvUtil = new CSVCommonsLoader(conn, STOCK_TABLE,
+                    Arrays.asList(STOCK_COLUMNS_WITH_BOGUS), true);
+            try {
+                csvUtil.upsert(new StringReader(STOCK_CSV_VALUES));
+                fail();
+            } catch (SQLException e) {
+                assertTrue(
+                        e.getMessage(),
+                        e.getMessage()
+                                .contains(
+                                        "ERROR 504 (42703): Undefined column. columnName=STOCK_SYMBOL.BOGUS"));
+            }
+        } finally {
+            if (parser != null)
+                parser.close();
+            if (conn != null)
+                conn.close();
+        }
+    }
+
+    @Test
+    public void testAllDatatypes() throws Exception {
+        CSVParser parser = null;
+        PhoenixConnection conn = null;
+        try {
+            // Create table
+            String statements = "CREATE TABLE IF NOT EXISTS "
+                    + DATATYPE_TABLE
+                    + " (CKEY VARCHAR NOT NULL PRIMARY KEY,"
+                    + "  CVARCHAR VARCHAR, CINTEGER INTEGER, CDECIMAL DECIMAL(31,10), CUNSIGNED_INT UNSIGNED_INT, CBOOLEAN BOOLEAN, CBIGINT BIGINT, CUNSIGNED_LONG UNSIGNED_LONG, CTIME TIME, CDATE DATE);";
+            conn = DriverManager.getConnection(getUrl())
+                    .unwrap(PhoenixConnection.class);
+            PhoenixRuntime.executeStatements(conn,
+                    new StringReader(statements), null);
+
+            // Upsert CSV file
+            CSVCommonsLoader csvUtil = new CSVCommonsLoader(conn,
+                    DATATYPE_TABLE, Collections.<String> emptyList(), true);
+            csvUtil.upsert(new StringReader(DATATYPES_CSV_VALUES));
+
+            // Compare Phoenix ResultSet with CSV file content
+            PreparedStatement statement = conn
+                    .prepareStatement("SELECT CKEY, CVARCHAR, CINTEGER, CDECIMAL, CUNSIGNED_INT, CBOOLEAN, CBIGINT, CUNSIGNED_LONG, CTIME, CDATE FROM "
+                            + DATATYPE_TABLE);
+            ResultSet phoenixResultSet = statement.executeQuery();
+            parser = new CSVParser(new StringReader(DATATYPES_CSV_VALUES),
+                    csvUtil.getFormat());
+
+            for (CSVRecord record : parser) {
+                assertTrue(phoenixResultSet.next());
+                int i = 0;
+                int size = record.size();
+                for (String value : record) {
+                    assertEquals(value, phoenixResultSet.getObject(i + 1)
+                            .toString().toUpperCase());
+                    if (i < size - 2)
+                        break;
+                    i++;
+                }
+                // special case for matching date, time values
+                assertEquals(DateUtil.parseTime(record.get(8)),
+                        phoenixResultSet.getTime("CTIME"));
+                assertEquals(DateUtil.parseDate(record.get(9)),
+                        phoenixResultSet.getDate("CDATE"));
+            }
+
+            assertFalse(phoenixResultSet.next());
+        } finally {
+            if (parser != null)
+                parser.close();
+            if (conn != null)
+                conn.close();
+        }
+    }
+
+    @Test
+    public void testCSVCommonsUpsertEncapsulatedControlChars() throws Exception {
+        CSVParser parser = null;
+        PhoenixConnection conn = null;
+        try {
+            // Create table
+            String statements = "CREATE TABLE IF NOT EXISTS "
+                    + ENCAPSULATED_CHARS_TABLE
+                    + "(MYKEY VARCHAR NOT NULL PRIMARY KEY, MYVALUE VARCHAR);";
+            conn = DriverManager.getConnection(getUrl())
+                    .unwrap(PhoenixConnection.class);
+            PhoenixRuntime.executeStatements(conn,
+                    new StringReader(statements), null);
+
+            // Upsert CSV file
+            CSVCommonsLoader csvUtil = new CSVCommonsLoader(conn,
+                    ENCAPSULATED_CHARS_TABLE, Collections.<String> emptyList(),
+                    true);
+            csvUtil.upsert(new StringReader(
+                    CSV_VALUES_ENCAPSULATED_CONTROL_CHARS_WITH_HEADER));
+
+            // Compare Phoenix ResultSet with CSV file content
+            PreparedStatement statement = conn
+                    .prepareStatement("SELECT MYKEY, MYVALUE FROM "
+                            + ENCAPSULATED_CHARS_TABLE);
+            ResultSet phoenixResultSet = statement.executeQuery();
+            parser = new CSVParser(new StringReader(
+                    CSV_VALUES_ENCAPSULATED_CONTROL_CHARS_WITH_HEADER),
+                    csvUtil.getFormat());
+            for (CSVRecord record : parser) {
+                assertTrue(phoenixResultSet.next());
+                int i = 0;
+                for (String value : record) {
+                    assertEquals(value, phoenixResultSet.getString(i + 1));
+                    i++;
+                }
+            }
+
+            assertFalse(phoenixResultSet.next());
+        } finally {
+            if (parser != null)
+                parser.close();
+            if (conn != null)
+                conn.close();
+        }
+    }
+
+    @Test
+    public void testCSVCommonsUpsertBadEncapsulatedControlChars()
+            throws Exception {
+        CSVParser parser = null;
+        PhoenixConnection conn = null;
+        try {
+            // Create table
+            String statements = "CREATE TABLE IF NOT EXISTS "
+                    + ENCAPSULATED_CHARS_TABLE
+                    + "(MYKEY VARCHAR NOT NULL PRIMARY KEY, MYVALUE VARCHAR);";
+            conn = DriverManager.getConnection(getUrl())
+                    .unwrap(PhoenixConnection.class);
+            PhoenixRuntime.executeStatements(conn,
+                    new StringReader(statements), null);
+
+            // Upsert CSV file
+            CSVCommonsLoader csvUtil = new CSVCommonsLoader(conn,
+                    ENCAPSULATED_CHARS_TABLE, Collections.<String> emptyList(),
+                    true);
+            try {
+                csvUtil.upsert(new StringReader(
+                        CSV_VALUES_BAD_ENCAPSULATED_CONTROL_CHARS_WITH_HEADER));
+                fail();
+            } catch (RuntimeException e) {
+                assertTrue(
+                        e.getMessage(),
+                        e.getMessage()
+                                .contains(
+                                        "invalid char between encapsulated token and delimiter"));
+            }
+        } finally {
+            if (parser != null)
+                parser.close();
+            if (conn != null)
+                conn.close();
+        }
+    }
+
+    @Test
+    public void testCSVCommonsUpsert_WithArray() throws Exception {
+        CSVParser parser = null;
+        PhoenixConnection conn = null;
+        try {
+
+            // Create table
+            String statements = "CREATE TABLE IF NOT EXISTS ARRAY_TABLE "
+                    + "(ID BIGINT NOT NULL PRIMARY KEY, VALARRAY INTEGER ARRAY);";
+            conn = DriverManager.getConnection(getUrl()).unwrap(
+                    PhoenixConnection.class);
+            PhoenixRuntime.executeStatements(conn,
+                    new StringReader(statements), null);
+
+            // Upsert CSV file
+            CSVCommonsLoader csvUtil = new CSVCommonsLoader(conn, "ARRAY_TABLE",
+                    Collections.<String> emptyList(), true, null, "!");
+            csvUtil.upsert(
+                    new StringReader("ID,VALARRAY\n"
+                            + "1,2!3!4\n"));
+
+            // Compare Phoenix ResultSet with CSV file content
+            PreparedStatement statement = conn
+                    .prepareStatement("SELECT ID, VALARRAY FROM ARRAY_TABLE");
+            ResultSet phoenixResultSet = statement.executeQuery();
+            assertTrue(phoenixResultSet.next());
+            assertEquals(1L, phoenixResultSet.getLong(1));
+            assertEquals(
+                    PArrayDataType.instantiatePhoenixArray(PDataType.INTEGER, new Integer[]{2, 3, 4}),
+                    phoenixResultSet.getArray(2));
+            assertFalse(phoenixResultSet.next());
+        } finally {
+            if (parser != null)
+                parser.close();
+            if (conn != null)
+                conn.close();
+        }
+    }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/f15c7f1e/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
new file mode 100644
index 0000000..39bde72
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.Test;
+
+import java.sql.Types;
+
+import static org.junit.Assert.assertEquals;
+
+public class QueryUtilTest {
+
+    private static final ColumnInfo ID_COLUMN = new ColumnInfo("ID", Types.BIGINT);
+    private static final ColumnInfo NAME_COLUMN = new ColumnInfo("NAME", Types.VARCHAR);
+
+    @Test
+    public void testConstructUpsertStatement_ColumnInfos() {
+        assertEquals(
+                "UPSERT INTO MYTAB (ID, NAME) VALUES (?, ?)",
+                QueryUtil.constructUpsertStatement("MYTAB", ImmutableList.of(ID_COLUMN, NAME_COLUMN)));
+
+    }
+
+    @Test(expected=IllegalArgumentException.class)
+    public void testConstructUpsertStatement_ColumnInfos_NoColumns() {
+        QueryUtil.constructUpsertStatement("MYTAB", ImmutableList.<ColumnInfo>of());
+    }
+
+    @Test
+    public void testConstructGenericUpsertStatement() {
+        assertEquals(
+                "UPSERT INTO MYTAB VALUES (?, ?)",
+                QueryUtil.constructGenericUpsertStatement("MYTAB", 2));
+    }
+
+    @Test(expected=IllegalArgumentException.class)
+    public void testConstructGenericUpsertStatement_NoColumns() {
+        QueryUtil.constructGenericUpsertStatement("MYTAB", 0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/f15c7f1e/phoenix-core/src/test/java/org/apache/phoenix/util/csv/CsvUpsertExecutorTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/csv/CsvUpsertExecutorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/csv/CsvUpsertExecutorTest.java
new file mode 100644
index 0000000..21b985c
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/csv/CsvUpsertExecutorTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util.csv;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVRecord;
+import org.apache.phoenix.query.BaseConnectionlessQueryTest;
+import org.apache.phoenix.schema.PArrayDataType;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.util.ColumnInfo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.List;
+
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+public class CsvUpsertExecutorTest extends BaseConnectionlessQueryTest {
+
+    private Connection conn;
+    private List<ColumnInfo> columnInfoList;
+    private PreparedStatement preparedStatement;
+    private CsvUpsertExecutor.UpsertListener upsertListener;
+
+    private CsvUpsertExecutor upsertExecutor;
+
+    @Before
+    public void setUp() throws SQLException {
+        columnInfoList = ImmutableList.of(
+                new ColumnInfo("ID", Types.BIGINT),
+                new ColumnInfo("NAME", Types.VARCHAR),
+                new ColumnInfo("AGE", Types.INTEGER),
+                new ColumnInfo("VALUES", PDataType.INTEGER_ARRAY.getSqlType()));
+
+        preparedStatement = mock(PreparedStatement.class);
+        upsertListener = mock(CsvUpsertExecutor.UpsertListener.class);
+        conn = DriverManager.getConnection(getUrl());
+        upsertExecutor = new CsvUpsertExecutor(conn, columnInfoList, preparedStatement, upsertListener, ":");
+    }
+
+    @After
+    public void tearDown() throws SQLException {
+        conn.close();
+    }
+
+    @Test
+    public void testExecute() throws Exception {
+        upsertExecutor.execute(createCsvRecord("123,NameValue,42,1:2:3"));
+
+        verify(upsertListener).upsertDone(1L);
+        verifyNoMoreInteractions(upsertListener);
+
+        verify(preparedStatement).setObject(1, Long.valueOf(123L));
+        verify(preparedStatement).setObject(2, "NameValue");
+        verify(preparedStatement).setObject(3, Integer.valueOf(42));
+        verify(preparedStatement).setObject(4, PArrayDataType.instantiatePhoenixArray(PDataType.INTEGER, new Object[]{1,2,3}));
+        verify(preparedStatement).execute();
+        verifyNoMoreInteractions(preparedStatement);
+    }
+
+    @Test
+    public void testExecute_TooFewFields() throws Exception {
+        CSVRecord csvRecordWithTooFewFields = createCsvRecord("123,NameValue");
+        upsertExecutor.execute(csvRecordWithTooFewFields);
+
+        verify(upsertListener).errorOnRecord(eq(csvRecordWithTooFewFields), anyString());
+        verifyNoMoreInteractions(upsertListener);
+    }
+
+    @Test
+    public void testExecute_TooManyFields() throws Exception {
+        CSVRecord csvRecordWithTooManyFields = createCsvRecord("123,NameValue,42,1:2:3,Garbage");
+        upsertExecutor.execute(csvRecordWithTooManyFields);
+
+        verify(upsertListener).upsertDone(1L);
+        verifyNoMoreInteractions(upsertListener);
+
+        verify(preparedStatement).setObject(1, Long.valueOf(123L));
+        verify(preparedStatement).setObject(2, "NameValue");
+        verify(preparedStatement).setObject(3, Integer.valueOf(42));
+        verify(preparedStatement).setObject(4, PArrayDataType.instantiatePhoenixArray(PDataType.INTEGER, new Object[]{1,2,3}));
+        verify(preparedStatement).execute();
+        verifyNoMoreInteractions(preparedStatement);
+    }
+
+    @Test
+    public void testExecute_NullField() throws Exception {
+        upsertExecutor.execute(createCsvRecord("123,NameValue,,1:2:3"));
+
+        verify(upsertListener).upsertDone(1L);
+        verifyNoMoreInteractions(upsertListener);
+
+        verify(preparedStatement).setObject(1, Long.valueOf(123L));
+        verify(preparedStatement).setObject(2, "NameValue");
+        verify(preparedStatement).setNull(3, columnInfoList.get(2).getSqlType());
+        verify(preparedStatement).setObject(4, PArrayDataType.instantiatePhoenixArray(PDataType.INTEGER, new Object[]{1,2,3}));
+        verify(preparedStatement).execute();
+        verifyNoMoreInteractions(preparedStatement);
+    }
+
+    @Test
+    public void testExecute_InvalidType() throws Exception {
+        CSVRecord csvRecordWithInvalidType = createCsvRecord("123,NameValue,ThisIsNotANumber,1:2:3");
+        upsertExecutor.execute(csvRecordWithInvalidType);
+
+        verify(upsertListener).errorOnRecord(eq(csvRecordWithInvalidType), anyString());
+        verifyNoMoreInteractions(upsertListener);
+    }
+
+    private CSVRecord createCsvRecord(String...columnValues) throws IOException {
+        String inputRecord = Joiner.on(',').join(columnValues);
+        return Iterables.getFirst(CSVParser.parse(inputRecord, CSVFormat.DEFAULT), null);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/f15c7f1e/phoenix-core/src/test/java/org/apache/phoenix/util/csv/StringToArrayConverterTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/csv/StringToArrayConverterTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/csv/StringToArrayConverterTest.java
new file mode 100644
index 0000000..8f2b0e5
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/csv/StringToArrayConverterTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util.csv;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.sql.Array;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+import org.apache.phoenix.query.BaseConnectionlessQueryTest;
+import org.apache.phoenix.schema.PDataType;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class StringToArrayConverterTest extends BaseConnectionlessQueryTest {
+
+    private Connection conn;
+    private StringToArrayConverter converter;
+
+    @Before
+    public void setUp() throws SQLException {
+        conn = DriverManager.getConnection(getUrl());
+        converter = new StringToArrayConverter(conn, ":", PDataType.VARCHAR);
+    }
+
+    @After
+    public void tearDown() throws SQLException {
+        conn.close();
+    }
+
+    @Test
+    public void testToArray_EmptyString() throws SQLException {
+        Array emptyArray = converter.toArray("");
+        assertEquals(0, ((Object[]) emptyArray.getArray()).length);
+    }
+
+
+    @Test
+    public void testToArray_SingleElement() throws SQLException {
+        Array singleElementArray = converter.toArray("value");
+        assertArrayEquals(
+                new Object[]{"value"},
+                (Object[]) singleElementArray.getArray());
+    }
+
+    @Test
+    public void testToArray_MultipleElements() throws SQLException {
+        Array multiElementArray = converter.toArray("one:two");
+        assertArrayEquals(
+                new Object[]{"one", "two"},
+                (Object[]) multiElementArray.getArray());
+    }
+
+    @Test
+    public void testToArray_IntegerValues() throws SQLException {
+        StringToArrayConverter intArrayConverter = new StringToArrayConverter(
+                                                            conn, ":", PDataType.INTEGER);
+        Array intArray = intArrayConverter.toArray("1:2:3");
+        assertArrayEquals(
+                new int[]{1, 2, 3},
+                (int[]) intArray.getArray());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/f15c7f1e/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/BaseEventSerializer.java
----------------------------------------------------------------------
diff --git a/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/BaseEventSerializer.java b/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/BaseEventSerializer.java
index 37bf464..fddcba5 100644
--- a/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/BaseEventSerializer.java
+++ b/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/BaseEventSerializer.java
@@ -27,6 +27,7 @@ import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -190,7 +191,7 @@ public abstract class BaseEventSerializer implements EventSerializer {
                 position++;
             }
             
-            this.upsertStatement = QueryUtil.constructUpsertStatement(columnMetadata, fullTableName, columnMetadata.length);
+            this.upsertStatement = QueryUtil.constructUpsertStatement(fullTableName, Arrays.asList(columnMetadata));
             logger.info(" the upsert statement is {} " ,this.upsertStatement);
             
         }  catch (SQLException e) {

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/f15c7f1e/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java
index 12cc6ed..3b0551f 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java
@@ -118,7 +118,7 @@ public class PhoenixPigConfiguration {
 		}
 		
 		// Generating UPSERT statement without column name information.
-		String upsertStmt = QueryUtil.constructUpsertStatement(null, getTableName(), columnMetadataList.size());
+		String upsertStmt = QueryUtil.constructGenericUpsertStatement(getTableName(), columnMetadataList.size());
 		LOG.info("Phoenix Upsert Statement: " + upsertStmt);
 		conf.set(UPSERT_STATEMENT, upsertStmt);
 	}


Mime
View raw message