phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamestay...@apache.org
Subject [5/5] git commit: PHOENIX-66 Support array creation from CSV file (GabrielReid)
Date Wed, 12 Mar 2014 00:24:59 GMT
PHOENIX-66 Support array creation from CSV file (GabrielReid)


Project: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/commit/f15c7f1e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/tree/f15c7f1e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/diff/f15c7f1e

Branch: refs/heads/4.0
Commit: f15c7f1ec21e50d1bf22a1af63f7e29088eb8fee
Parents: e370cc5
Author: jamestaylor <jamestaylor@apache.org>
Authored: Tue Mar 11 17:06:04 2014 -0700
Committer: jamestaylor <jamestaylor@apache.org>
Committed: Tue Mar 11 17:20:01 2014 -0700

----------------------------------------------------------------------
 .../apache/phoenix/util/CSVCommonsLoader.java   |  631 +++++------
 .../org/apache/phoenix/util/PhoenixRuntime.java |    9 +-
 .../java/org/apache/phoenix/util/QueryUtil.java |  129 ++-
 .../phoenix/util/csv/CsvUpsertExecutor.java     |  222 ++++
 .../util/csv/StringToArrayConverter.java        |   93 ++
 .../phoenix/end2end/CSVCommonsLoaderTest.java   | 1071 +++++++++---------
 .../org/apache/phoenix/util/QueryUtilTest.java  |   56 +
 .../phoenix/util/csv/CsvUpsertExecutorTest.java |  144 +++
 .../util/csv/StringToArrayConverterTest.java    |   82 ++
 .../flume/serializer/BaseEventSerializer.java   |    3 +-
 .../phoenix/pig/PhoenixPigConfiguration.java    |    2 +-
 11 files changed, 1539 insertions(+), 903 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/f15c7f1e/phoenix-core/src/main/java/org/apache/phoenix/util/CSVCommonsLoader.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/CSVCommonsLoader.java b/phoenix-core/src/main/java/org/apache/phoenix/util/CSVCommonsLoader.java
index 1947f53..3656ce3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/CSVCommonsLoader.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/CSVCommonsLoader.java
@@ -20,7 +20,6 @@ package org.apache.phoenix.util;
 import java.io.File;
 import java.io.Reader;
 import java.sql.DatabaseMetaData;
-import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
@@ -35,26 +34,31 @@ import org.apache.commons.csv.CSVRecord;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.util.csv.CsvUpsertExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 /***
  * Upserts CSV data using Phoenix JDBC connection
- * 
- * 
- * 
  */
 public class CSVCommonsLoader {
 
-	private final PhoenixConnection conn;
-	private final String tableName;
-	private final List<String> columns;
-	private final boolean isStrict;
+    private static final Logger LOG = LoggerFactory.getLogger(CSVCommonsLoader.class);
+
+    public static final String DEFAULT_ARRAY_ELEMENT_SEPARATOR = ":";
+
+    private final PhoenixConnection conn;
+    private final String tableName;
+    private final List<String> columns;
+    private final boolean isStrict;
     boolean userSuppliedMetaCharacters = false;
-	private final List<String> customMetaCharacters;
-	private PhoenixHeaderSource headerSource = PhoenixHeaderSource.FROM_TABLE;
-	private final CSVFormat format;
+    private final List<String> customMetaCharacters;
+    private PhoenixHeaderSource headerSource = PhoenixHeaderSource.FROM_TABLE;
+    private final CSVFormat format;
     private final Map<String,Character> ctrlTable = new HashMap<String,Character>()
{
         {   put("1",'\u0001');
             put("2",'\u0002');
@@ -66,102 +70,104 @@ public class CSVCommonsLoader {
             put("8",'\u0008');
             put("9",'\u0009');}};
 
-	private int unfoundColumnCount;
-	
+    private final String arrayElementSeparator;
+    private int unfoundColumnCount;
+
     public enum PhoenixHeaderSource {
-    	FROM_TABLE,
-    	IN_LINE,
-    	SUPPLIED_BY_USER
+        FROM_TABLE,
+        IN_LINE,
+        SUPPLIED_BY_USER
+    }
+
+    public CSVCommonsLoader(PhoenixConnection conn, String tableName,
+            List<String> columns, boolean isStrict) {
+        this(conn, tableName, columns, isStrict, null, DEFAULT_ARRAY_ELEMENT_SEPARATOR);
+    }
+
+    public CSVCommonsLoader(PhoenixConnection conn, String tableName,
+            List<String> columns, boolean isStrict, List<String> customMetaCharacters,
String arrayElementSeparator) {
+        this.conn = conn;
+        this.tableName = tableName;
+        this.columns = columns;
+        this.isStrict = isStrict;
+        this.customMetaCharacters = customMetaCharacters;
+        if (customMetaCharacters==null || customMetaCharacters.size()==0) {
+            userSuppliedMetaCharacters=false;
+        } else if (customMetaCharacters.size()==3) {
+            userSuppliedMetaCharacters=true;
+        }
+        else{
+            throw new IllegalArgumentException(
+                    String.format("customMetaCharacters must have no elements or three elements.
Supplied value is %s",
+                            buildStringFromList(customMetaCharacters)));
+        }
+
+        // implicit in the columns value.
+        if (columns !=null && !columns.isEmpty()) {
+            headerSource = PhoenixHeaderSource.SUPPLIED_BY_USER;
+        }
+        else if (columns != null && columns.isEmpty()) {
+            headerSource = PhoenixHeaderSource.IN_LINE;
+        }
+
+        this.arrayElementSeparator = arrayElementSeparator;
+        this.format = buildFormat();
+    }
+
+    public CSVFormat getFormat() {
+        return format;
     }
 
-	public CSVCommonsLoader(PhoenixConnection conn, String tableName,
-			List<String> columns, boolean isStrict) {
-		this(conn, tableName, columns, isStrict, null);
-	}
-
-	public CSVCommonsLoader(PhoenixConnection conn, String tableName,
-			List<String> columns, boolean isStrict, List<String> customMetaCharacters)
{
-		this.conn = conn;
-		this.tableName = tableName;
-		this.columns = columns;
-		this.isStrict = isStrict;
-		this.customMetaCharacters = customMetaCharacters;
-		if (customMetaCharacters==null || customMetaCharacters.size()==0) {
-			userSuppliedMetaCharacters=false;
-		} else if (customMetaCharacters.size()==3) {
-			userSuppliedMetaCharacters=true;
-		}
-		else{
-			throw new IllegalArgumentException(
-					String.format("customMetaCharacters must have no elements or three elements. Supplied
value is %s",
-							buildStringFromList(customMetaCharacters)));
-		}
-		
-		// implicit in the columns value.
-		if (columns !=null && !columns.isEmpty()) {
-			headerSource = PhoenixHeaderSource.SUPPLIED_BY_USER;
-		}
-		else if (columns != null && columns.isEmpty()) {
-			headerSource = PhoenixHeaderSource.IN_LINE;
-		}
-		
-		this.format = buildFormat();
-	}
-	
-	public CSVFormat getFormat() {
-		return format;
-	}
-	
-	/**
+    /**
      * default settings
      * delimiter = ',' 
      * quoteChar = '"',
      * escape = null
      * recordSeparator = CRLF, CR, or LF
      * ignore empty lines allows the last data line to have a recordSeparator
-	 * 
-	 * @return CSVFormat based on constructor settings.
-	 */
-	private CSVFormat buildFormat() {
+     *
+     * @return CSVFormat based on constructor settings.
+     */
+    private CSVFormat buildFormat() {
         CSVFormat format = CSVFormat.DEFAULT
                 .withIgnoreEmptyLines(true);
         if (userSuppliedMetaCharacters) {
-        	// list error checking handled in constructor above. 
-        	// use 0 to keep default setting
-        	String delimiter = customMetaCharacters.get(0);
-        	String quote = customMetaCharacters.get(1);
-        	String escape = customMetaCharacters.get(2);
-        	
-        	if (!"0".equals(delimiter)) {
-        		format = format.withDelimiter(getCustomMetaCharacter(delimiter));
-        	}
-        	if (!"0".equals(quote)) {
-        		format = format.withQuoteChar(getCustomMetaCharacter(quote));
-        	}
-        	if (!"0".equals(quote)) {
-        		format = format.withEscape(getCustomMetaCharacter(escape));
-        	}
-        	
+            // list error checking handled in constructor above.
+            // use 0 to keep default setting
+            String delimiter = customMetaCharacters.get(0);
+            String quote = customMetaCharacters.get(1);
+            String escape = customMetaCharacters.get(2);
+
+            if (!"0".equals(delimiter)) {
+                format = format.withDelimiter(getCustomMetaCharacter(delimiter));
+            }
+            if (!"0".equals(quote)) {
+                format = format.withQuoteChar(getCustomMetaCharacter(quote));
+            }
+            if (!"0".equals(quote)) {
+                format = format.withEscape(getCustomMetaCharacter(escape));
+            }
+
         }
         switch(headerSource) {
         case FROM_TABLE:
-      	  // obtain headers from table, so format should not expect a header.
-      	break;
+            // obtain headers from table, so format should not expect a header.
+            break;
         case IN_LINE:
-      	  // an empty string array triggers csv loader to grab the first line as the header
-      	  format = format.withHeader(new String[0]);
-      	break;
+            // an empty string array triggers csv loader to grab the first line as the header
+            format = format.withHeader(new String[0]);
+            break;
         case SUPPLIED_BY_USER:
-      	  // a populated string array supplied by the user
-      	  format = format.withHeader(columns.toArray(new String[columns.size()]));
-      	break;
-      default:
-      	throw new RuntimeException("Header source was unable to be inferred.");
-      		
-      }
+            // a populated string array supplied by the user
+            format = format.withHeader(columns.toArray(new String[columns.size()]));
+            break;
+        default:
+            throw new RuntimeException("Header source was unable to be inferred.");
+
+        }
         return format;
-	}
-        
+    }
+
 
     public char getCustomMetaCharacter(String field) {
         if(this.ctrlTable.containsKey(field)) {
@@ -172,248 +178,205 @@ public class CSVCommonsLoader {
     }
 
     /**
-	 * Upserts data from CSV file. 
-	 * 
-	 * Data is batched up based on connection batch size. 
-	 * Column PDataType is read from metadata and is used to convert
-	 * column value to correct type before upsert. 
-	 * 
-	 * The constructor determines the format for the CSV files.
-	 * 
-	 * @param fileName
-	 * @throws Exception
-	 */
-	public void upsert(String fileName) throws Exception {
-		CSVParser parser = CSVParser.parse(new File(fileName), 
-				format); 
-		upsert(parser);
-	}
-	
-	public void upsert(Reader reader) throws Exception {
-		CSVParser parser = new CSVParser(reader,format); 
-		upsert(parser);
-	}
-
-	private static <T> String buildStringFromList(List<T> list) {
-		StringBuilder sb = new StringBuilder();
-		boolean first = true;
-		for (T val : list) {
-			if (first) {
-				sb.append((val==null?"null":val.toString()));
-				first = false;
-			} else {
-				sb.append(", " + (val==null?"null":val.toString()));
-			}
-		}
-		return sb.toString();
-	}
-
-	/**
-	 * Data is batched up based on connection batch size. 
-	 * Column PDataType is read from metadata and is used to convert
-	 * column value to correct type before upsert. 
-	 * 
-	 * The format is determined by the supplied parser. 
-
-	 * @param parser
-	 *            CSVParser instance
-	 * @throws Exception
-	 */
-	public void upsert(CSVParser parser) throws Exception {
-		List<String> columns = this.columns;
-		switch (headerSource) {
-		case FROM_TABLE:
-			System.out.println(String.format("csv columns from database."));
-			break;
-		case IN_LINE: 
-			columns = new ArrayList<String>(); 
-			for (String colName : parser.getHeaderMap().keySet()) {
-				columns.add(colName); // iterates in column order
-			}
-			System.out.println(String.format("csv columns from header line. length=%s, %s",
-					columns.size(), buildStringFromList(columns)));
-			break;
-		case SUPPLIED_BY_USER:
-			System.out.println(String.format("csv columns from user. length=%s, %s",
-					columns.size(), buildStringFromList(columns)));
-			break;
-		default:
-			throw new IllegalStateException("parser has unknown column source.");
-		}
-		ColumnInfo[] columnInfo = generateColumnInfo(columns);
-		System.out.println(String.format("phoenix columnInfo length=%s, %s",
-				columnInfo.length,
-				buildStringFromList(Arrays.asList(columnInfo))));
-		PreparedStatement stmt = null;
-		PreparedStatement[] stmtCache = null;
-		if (columns == null) {
-			stmtCache = new PreparedStatement[columnInfo.length];
-		} else {
-			String upsertStatement = QueryUtil.constructUpsertStatement(
-					columnInfo, tableName, columnInfo.length
-							- unfoundColumnCount);
-			System.out.println(String.format("prepared statement %s",
-					upsertStatement));
-			stmt = conn.prepareStatement(upsertStatement);
-		}
-		int rowCount = 0;
-		int upsertBatchSize = conn.getMutateBatchSize();
-		boolean wasAutoCommit = conn.getAutoCommit();
-		try {
-			conn.setAutoCommit(false);
-			Object upsertValue = null;
-			long start = System.currentTimeMillis();
-
-			int count = 0;
-
-			// Upsert data based on SqlType of each column
-			for (CSVRecord nextRecord : parser) {
-				count++;
-				//TODO expose progress monitor setting
-				if (count % 1000 == 0) { 
-					System.out.println(String.format(
-							"processing line line %s, %s=%s", count,
-							columnInfo[0].getColumnName(), nextRecord.get(0)));
-				}
-				if (columns == null) {
-					stmt = stmtCache[nextRecord.size() - 1];
-					if (stmt == null) {
-						String upsertStatement = QueryUtil
-								.constructUpsertStatement(columnInfo,
-										tableName, nextRecord.size());
-						stmt = conn.prepareStatement(upsertStatement);
-						stmtCache[nextRecord.size() - 1] = stmt;
-					}
-				}
-
-				// skip inconsistent line,
-				if (!nextRecord.isConsistent()) { 
-					System.out.println(String.format(
-							"Unable to process line number %s"
-									+ ", line columns %s"
-									+ ", expected columns %s\n%s", count,
-							nextRecord.size(), columnInfo.length,
-							nextRecord.toString()));
-					continue;
-				}
-				for (int index = 0; index < columnInfo.length; index++) {
-					if (columnInfo[index] == null) {
-						continue;
-					}
-					String fieldValue = nextRecord.get(index);
-					Integer info = columnInfo[index].getSqlType();
-					upsertValue = convertTypeSpecificValue(fieldValue, info);
-					if (upsertValue != null) {
-						stmt.setObject(index + 1, upsertValue,
-								columnInfo[index].getSqlType());
-					} else {
-						stmt.setNull(index + 1, columnInfo[index].getSqlType());
-					}
-				}
-				stmt.execute();
-
-				// Commit when batch size is reached
-				if (++rowCount % upsertBatchSize == 0) {
-					conn.commit();
-					System.out.println("Rows upserted: " + rowCount);
-				}
-			}
-			conn.commit();
-			double elapsedDuration = ((System.currentTimeMillis() - start) / 1000.0);
-			System.out.println("CSV Upsert complete. " + rowCount
-					+ " rows upserted");
-			System.out.println("Time: " + elapsedDuration + " sec(s)\n");
-		} finally {
-			if (stmt != null) {
-				stmt.close();
-			}
-			// release reader resources.
-			if (parser != null) {
-				parser.close();
-			}
-			if (wasAutoCommit)
-				conn.setAutoCommit(true);
-		}
-	}
-
-	/**
-	 * Gets CSV string input converted to correct type
-	 */
-	private Object convertTypeSpecificValue(String s, Integer sqlType)
-			throws Exception {
-	    return PDataType.fromTypeId(sqlType).toObject(s);
-	}
-
-
-	/**
-	 * Get array of ColumnInfos that contain Column Name and its associated
-	 * PDataType
-	 * 
-	 * @param columns
-	 * @return
-	 * @throws SQLException
-	 */
-	private ColumnInfo[] generateColumnInfo(List<String> columns)
-			throws SQLException {
-		Map<String, Integer> columnNameToTypeMap = Maps.newLinkedHashMap();
-		DatabaseMetaData dbmd = conn.getMetaData();
-		// TODO: escape wildcard characters here because we don't want that
-		// behavior here
-		String escapedTableName = StringUtil.escapeLike(tableName);
-		String[] schemaAndTable = escapedTableName.split("\\.");
-		ResultSet rs = null;
-		try {
-			rs = dbmd.getColumns(null, (schemaAndTable.length == 1 ? ""
-					: schemaAndTable[0]),
-					(schemaAndTable.length == 1 ? escapedTableName
-							: schemaAndTable[1]), null);
-			while (rs.next()) {
-				columnNameToTypeMap.put(
-						rs.getString(QueryUtil.COLUMN_NAME_POSITION),
-						rs.getInt(QueryUtil.DATA_TYPE_POSITION));
-			}
-		} finally {
-			if (rs != null) {
-				rs.close();
-			}
-		}
-		ColumnInfo[] columnType;
-		if (columns == null) {
-			int i = 0;
-			columnType = new ColumnInfo[columnNameToTypeMap.size()];
-			for (Map.Entry<String, Integer> entry : columnNameToTypeMap
-					.entrySet()) {
-				columnType[i++] = new ColumnInfo(entry.getKey(),
-						entry.getValue());
-			}
-		} else {
-			// Leave "null" as indication to skip b/c it doesn't exist
-			columnType = new ColumnInfo[columns.size()];
-			for (int i = 0; i < columns.size(); i++) {
-				String columnName = SchemaUtil.normalizeIdentifier(columns.get(
-						i).trim());
-				Integer sqlType = columnNameToTypeMap.get(columnName);
-				if (sqlType == null) {
-					if (isStrict) {
-						throw new SQLExceptionInfo.Builder(
-								SQLExceptionCode.COLUMN_NOT_FOUND)
-								.setColumnName(columnName)
-								.setTableName(tableName).build()
-								.buildException();
-					}
-					unfoundColumnCount++;
-				} else {
-					columnType[i] = new ColumnInfo(columnName, sqlType);
-				}
-			}
-			if (unfoundColumnCount == columns.size()) {
-				throw new SQLExceptionInfo.Builder(
-						SQLExceptionCode.COLUMN_NOT_FOUND)
-						.setColumnName(
-								Arrays.toString(columns.toArray(new String[0])))
-						.setTableName(tableName).build().buildException();
-			}
-		}
-		return columnType;
-	}
+     * Upserts data from CSV file.
+     *
+     * Data is batched up based on connection batch size.
+     * Column PDataType is read from metadata and is used to convert
+     * column value to correct type before upsert.
+     *
+     * The constructor determines the format for the CSV files.
+     *
+     * @param fileName
+     * @throws Exception
+     */
+    public void upsert(String fileName) throws Exception {
+        CSVParser parser = CSVParser.parse(new File(fileName),
+                format);
+        upsert(parser);
+    }
+
+    public void upsert(Reader reader) throws Exception {
+        CSVParser parser = new CSVParser(reader,format);
+        upsert(parser);
+    }
+
+    private static <T> String buildStringFromList(List<T> list) {
+        return Joiner.on(", ").useForNull("null").join(list);
+    }
+
+    /**
+     * Data is batched up based on connection batch size.
+     * Column PDataType is read from metadata and is used to convert
+     * column value to correct type before upsert.
+     *
+     * The format is determined by the supplied csvParser.
+
+     * @param csvParser
+     *            CSVParser instance
+     * @throws Exception
+     */
+    public void upsert(CSVParser csvParser) throws Exception {
+        List<ColumnInfo> columnInfoList = buildColumnInfoList(csvParser);
+
+        boolean wasAutoCommit = conn.getAutoCommit();
+        try {
+            conn.setAutoCommit(false);
+            long start = System.currentTimeMillis();
+            CsvUpsertListener upsertListener = new CsvUpsertListener(conn, conn.getMutateBatchSize());
+            CsvUpsertExecutor csvUpsertExecutor = CsvUpsertExecutor.create(conn, tableName,
+                    columnInfoList, upsertListener, arrayElementSeparator);
+
+            csvUpsertExecutor.execute(csvParser);
+            csvUpsertExecutor.close();
+
+            conn.commit();
+            double elapsedDuration = ((System.currentTimeMillis() - start) / 1000.0);
+            System.out.println("CSV Upsert complete. " + upsertListener.getTotalUpsertCount()
+                    + " rows upserted");
+            System.out.println("Time: " + elapsedDuration + " sec(s)\n");
+
+        } finally {
+
+            // release reader resources.
+            if (csvParser != null) {
+                csvParser.close();
+            }
+            if (wasAutoCommit) {
+                conn.setAutoCommit(true);
+            }
+        }
+    }
+
+    private List<ColumnInfo> buildColumnInfoList(CSVParser parser) throws SQLException
{
+        List<String> columns = this.columns;
+        switch (headerSource) {
+        case FROM_TABLE:
+            System.out.println(String.format("csv columns from database."));
+            break;
+        case IN_LINE:
+            columns = new ArrayList<String>();
+            for (String colName : parser.getHeaderMap().keySet()) {
+                columns.add(colName); // iterates in column order
+            }
+            System.out.println(String.format("csv columns from header line. length=%s, %s",
+                    columns.size(), buildStringFromList(columns)));
+            break;
+        case SUPPLIED_BY_USER:
+            System.out.println(String.format("csv columns from user. length=%s, %s",
+                    columns.size(), buildStringFromList(columns)));
+            break;
+        default:
+            throw new IllegalStateException("parser has unknown column source.");
+        }
+        return generateColumnInfo(columns);
+    }
+
+    /**
+     * Get array of ColumnInfos that contain Column Name and its associated
+     * PDataType
+     *
+     * @param columns
+     * @return
+     * @throws SQLException
+     */
+    private List<ColumnInfo> generateColumnInfo(List<String> columns)
+            throws SQLException {
+        Map<String, Integer> columnNameToTypeMap = Maps.newLinkedHashMap();
+        DatabaseMetaData dbmd = conn.getMetaData();
+        // TODO: escape wildcard characters here because we don't want that
+        // behavior here
+        String escapedTableName = StringUtil.escapeLike(tableName);
+        String[] schemaAndTable = escapedTableName.split("\\.");
+        ResultSet rs = null;
+        try {
+            rs = dbmd.getColumns(null, (schemaAndTable.length == 1 ? ""
+                    : schemaAndTable[0]),
+                    (schemaAndTable.length == 1 ? escapedTableName
+                            : schemaAndTable[1]), null);
+            while (rs.next()) {
+                columnNameToTypeMap.put(
+                        rs.getString(QueryUtil.COLUMN_NAME_POSITION),
+                        rs.getInt(QueryUtil.DATA_TYPE_POSITION));
+            }
+        } finally {
+            if (rs != null) {
+                rs.close();
+            }
+        }
+        List<ColumnInfo> columnInfoList = Lists.newArrayList();
+        if (columns == null) {
+            for (Map.Entry<String, Integer> entry : columnNameToTypeMap
+                    .entrySet()) {
+                columnInfoList.add(new ColumnInfo(entry.getKey(), entry.getValue()));
+            }
+        } else {
+            // Leave "null" as indication to skip b/c it doesn't exist
+            for (int i = 0; i < columns.size(); i++) {
+                String columnName = columns.get(i).trim();
+                Integer sqlType = columnNameToTypeMap.get(columnName);
+                if (sqlType == null) {
+                    if (isStrict) {
+                        throw new SQLExceptionInfo.Builder(
+                                SQLExceptionCode.COLUMN_NOT_FOUND)
+                                .setColumnName(columnName)
+                                .setTableName(tableName).build()
+                                .buildException();
+                    }
+                    unfoundColumnCount++;
+                } else {
+                    columnInfoList.add(new ColumnInfo(columnName, sqlType));
+                }
+            }
+            if (unfoundColumnCount == columns.size()) {
+                throw new SQLExceptionInfo.Builder(
+                        SQLExceptionCode.COLUMN_NOT_FOUND)
+                        .setColumnName(
+                                Arrays.toString(columns.toArray(new String[0])))
+                        .setTableName(tableName).build().buildException();
+            }
+        }
+        return columnInfoList;
+    }
+
+    static class CsvUpsertListener implements CsvUpsertExecutor.UpsertListener {
+
+        private final PhoenixConnection conn;
+        private final int upsertBatchSize;
+        private long totalUpserts = 0L;
+
+        CsvUpsertListener(PhoenixConnection conn, int upsertBatchSize) {
+            this.conn = conn;
+            this.upsertBatchSize = upsertBatchSize;
+        }
+
+        @Override
+        public void upsertDone(long upsertCount) {
+            totalUpserts = upsertCount;
+            if (upsertCount % upsertBatchSize == 0) {
+                if (upsertCount % 1000 == 0) {
+                    LOG.info("Processed upsert #{}", upsertCount);
+                }
+                try {
+                    LOG.info("Committing after {} records", upsertCount);
+                    conn.commit();
+                } catch (SQLException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+
+        @Override
+        public void errorOnRecord(CSVRecord csvRecord, String errorMessage) {
+            LOG.error("Error upserting record {}: {}", csvRecord, errorMessage);
+        }
+
+        /**
+         * Get the total number of upserts that this listener has been notified about up
until now.
+         *
+         * @return the total count of upserts
+         */
+        public long getTotalUpsertCount() {
+            return totalUpserts;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/f15c7f1e/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
index 90f04ee..1318eca 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
@@ -105,10 +105,11 @@ public class PhoenixRuntime {
     private static final String HEADER_OPTION = "-h";
     private static final String STRICT_OPTION = "-s";
     private static final String CSV_OPTION = "-d";
+    private static final String ARRAY_ELEMENT_SEP_OPTION = "-a";
     private static final String HEADER_IN_LINE = "in-line";
     private static final String SQL_FILE_EXT = ".sql";
     private static final String CSV_FILE_EXT = ".csv";
-    
+
     private static void usageError() {
         System.err.println("Usage: psql [-t table-name] [-h comma-separated-column-names
| in-line] [-d field-delimiter-char quote-char escape-char]<zookeeper>  <path-to-sql-or-csv-file>...\n"
+
                 "  By default, the name of the CSV file is used to determine the Phoenix
table into which the CSV data is loaded\n" +
@@ -120,6 +121,7 @@ public class PhoenixRuntime {
                 "  -s uses strict mode by throwing an exception if a column name doesn't
match during CSV loading.\n" +
                 "  -d uses custom delimiters for CSV loader, need to specify single char
for field delimiter, phrase delimiter, and escape char.\n" +
                 "     number is NOT usually a delimiter and shall be taken as 1 -> ctrl
A, 2 -> ctrl B ... 9 -> ctrl I. \n" +
+                "  -a define the array element separator, defaults to ':'\n" +
                 "Examples:\n" +
                 "  psql localhost my_ddl.sql\n" +
                 "  psql localhost my_ddl.sql my_table.csv\n" +
@@ -146,6 +148,7 @@ public class PhoenixRuntime {
             String tableName = null;
             List<String> columns = null;
             boolean isStrict = false;
+            String arrayElementSeparator = CSVCommonsLoader.DEFAULT_ARRAY_ELEMENT_SEPARATOR;
             List<String> customMetaCharacters = new ArrayList<String>();
 
             int i = 0;
@@ -179,6 +182,8 @@ public class PhoenixRuntime {
                             usageError();
                         }
                     }
+                } else if (ARRAY_ELEMENT_SEP_OPTION.equals(args[i])) {
+                    arrayElementSeparator = args[++i];
                 } else {
                     break;
                 }
@@ -200,7 +205,7 @@ public class PhoenixRuntime {
                         tableName = fileName.substring(fileName.lastIndexOf(File.separatorChar)
+ 1, fileName.length()-CSV_FILE_EXT.length());
                     }
                     CSVCommonsLoader csvLoader = 
-                    		new CSVCommonsLoader(conn, tableName, columns, isStrict, customMetaCharacters);
+                    		new CSVCommonsLoader(conn, tableName, columns, isStrict, customMetaCharacters,
arrayElementSeparator);
                     csvLoader.upsert(fileName);
                 } else {
                     usageError();

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/f15c7f1e/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
index 73bf013..141e43d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
@@ -21,63 +21,92 @@ package org.apache.phoenix.util;
 import java.sql.DatabaseMetaData;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.List;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 
 public class QueryUtil {
-	
+
     /**
      *  Column family name index within ResultSet resulting from {@link DatabaseMetaData#getColumns(String,
String, String, String)}
      */
     public static final int COLUMN_FAMILY_POSITION = 24;
 
- 	/**
-	 *  Column name index within ResultSet resulting from {@link DatabaseMetaData#getColumns(String,
String, String, String)}
-	 */
-	public static final int COLUMN_NAME_POSITION = 4;
-	/**
-	 * Data type index within ResultSet resulting from {@link DatabaseMetaData#getColumns(String,
String, String, String)}
-	 */
-	public static final int DATA_TYPE_POSITION = 5;
-
-	/**
-	 * Generates the upsert statement based on number of ColumnInfo. If
-	 * ColumnInfo is unavailable, it produces a generic UPSERT query without
-	 * columns information using number of columns.
-	 * 
-	 * @return Upsert Statement
-	 */
-	public static String constructUpsertStatement(ColumnInfo[] columnTypes,
-			String tableName, int numColumns) {
-		if(numColumns <= 0) {
-			throw new RuntimeException("Number of columns in HBase table cannot be less than 1");
-		}
-		StringBuilder sb = new StringBuilder();
-		sb.append("UPSERT INTO ");
-		sb.append(tableName);
-		if (columnTypes != null) {
-			sb.append("(");
-			for (ColumnInfo columnType : columnTypes) {
-				if (columnType != null) {
-					sb.append(columnType.getColumnName());
-					sb.append(",");
-				}
-			}
-			// Remove the trailing comma
-			sb.setLength(sb.length() - 1);
-			sb.append(") ");
-		}
-		sb.append("\n");
-		sb.append("VALUES (");
-		for (short i = 0; i < numColumns - 1; i++) {
-			sb.append("?,");
-		}
-		sb.append("?)");
-
-		return sb.toString();
-	}
-
-	public static String getUrl(String server) {
-		return PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + server;
-	}
+    /**
+     *  Column name index within ResultSet resulting from {@link DatabaseMetaData#getColumns(String,
String, String, String)}
+     */
+    public static final int COLUMN_NAME_POSITION = 4;
+    /**
+     * Data type index within ResultSet resulting from {@link DatabaseMetaData#getColumns(String,
String, String, String)}
+     */
+    public static final int DATA_TYPE_POSITION = 5;
+
+    /**
+     * Generate an upsert statement based on a list of {@code ColumnInfo}s with parameter
markers. The list of
+     * {@code ColumnInfo}s must contain at least one element.
+     *
+     * @param tableName name of the table for which the upsert statement is to be created
+     * @param columnInfos list of column to be included in the upsert statement
+     * @return the created {@code UPSERT} statement
+     */
+    public static String constructUpsertStatement(String tableName, List<ColumnInfo>
columnInfos) {
+
+        if (columnInfos.isEmpty()) {
+            throw new IllegalArgumentException("At least one column must be provided for
upserts");
+        }
+
+        List<String> parameterList = Lists.newArrayList();
+        for (int i = 0; i < columnInfos.size(); i++) {
+            parameterList.add("?");
+        }
+        return String.format(
+                "UPSERT INTO %s (%s) VALUES (%s)",
+                tableName,
+                Joiner.on(", ").join(
+                        Iterables.transform(
+                                columnInfos,
+                                new Function<ColumnInfo, String>() {
+                                    @Nullable
+                                    @Override
+                                    public String apply(@Nullable ColumnInfo columnInfo)
{
+                                        return columnInfo.getColumnName();
+                                    }
+                                })),
+                Joiner.on(", ").join(parameterList));
+
+    }
+
+    /**
+     * Generate a generic upsert statement based on a number of columns. The created upsert
statement will not include
+     * any named columns, but will include parameter markers for the given number of columns.
The number of columns
+     * must be greater than zero.
+     *
+     * @param tableName name of the table for which the upsert statement is to be created
+     * @param numColumns number of columns to be included in the upsert statement
+     * @return the created {@code UPSERT} statement
+     */
+    public static String constructGenericUpsertStatement(String tableName, int numColumns)
{
+
+
+        if (numColumns == 0) {
+            throw new IllegalArgumentException("At least one column must be provided for
upserts");
+        }
+
+        List<String> parameterList = Lists.newArrayListWithCapacity(numColumns);
+        for (int i = 0; i < numColumns; i++) {
+            parameterList.add("?");
+        }
+        return String.format("UPSERT INTO %s VALUES (%s)", tableName, Joiner.on(", ").join(parameterList));
+    }
+
+    public static String getUrl(String server) {
+        return PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + server;
+    }
 
     public static String getExplainPlan(ResultSet rs) throws SQLException {
         StringBuilder buf = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/f15c7f1e/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java
b/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java
new file mode 100644
index 0000000..dbb09d5
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util.csv;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import org.apache.commons.csv.CSVRecord;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.QueryUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+
+/**
+ * Executes upsert statements on a provided {@code PreparedStatement} based on incoming CSV
records, notifying a
+ * listener each time the prepared statement is executed.
+ */
+public class CsvUpsertExecutor implements Closeable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(CsvUpsertExecutor.class);
+
+    private final String arrayElementSeparator;
+    private final Connection conn;
+    private final List<PDataType> dataTypes;
+    private final List<Function<String,Object>> conversionFunctions;
+    private final PreparedStatement preparedStatement;
+    private final UpsertListener upsertListener;
+    private long upsertCount = 0L;
+
+    /**
+     * A listener that is called for events based on incoming CSV data.
+     */
+    public static interface UpsertListener {
+
+        /**
+         * Called when an upsert has been sucessfully completed. The given upsertCount is
the total number of upserts
+         * completed on the caller up to this point.
+         *
+         * @param upsertCount total number of upserts that have been completed
+         */
+        void upsertDone(long upsertCount);
+
+
+        /**
+         * Called when executing a prepared statement has failed on a given record.
+         *
+         * @param csvRecord the CSV record that was being upserted when the error occurred
+         */
+        void errorOnRecord(CSVRecord csvRecord, String errorMessage);
+    }
+
+
+    /**
+     * Static constructor method for creating a CsvUpsertExecutor.
+     *
+     * @param conn Phoenix connection upon which upserts are to be performed
+     * @param tableName name of the table in which upserts are to be performed
+     * @param columnInfoList description of the columns to be upserted to, in the same order
as in the CSV input
+     * @param upsertListener listener that will be notified of upserts, can be null
+     * @param arrayElementSeparator separator string to delimit string representations of
arrays
+     * @return the created CsvUpsertExecutor
+     */
+    public static CsvUpsertExecutor create(PhoenixConnection conn, String tableName, List<ColumnInfo>
columnInfoList,
+            UpsertListener upsertListener, String arrayElementSeparator) {
+        PreparedStatement preparedStatement = null;
+        try {
+            String upsertSql = QueryUtil.constructUpsertStatement(tableName, columnInfoList);
+            LOG.info("Upserting SQL data with {}", upsertSql);
+            preparedStatement = conn.prepareStatement(upsertSql);
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+        return new CsvUpsertExecutor(conn, columnInfoList, preparedStatement, upsertListener,
+                arrayElementSeparator);
+    }
+
+    /**
+     * Construct with the definition of incoming columns, and the statement upon which upsert
statements
+     * are to be performed.
+     */
+    CsvUpsertExecutor(Connection conn, List<ColumnInfo> columnInfoList, PreparedStatement
preparedStatement,
+            UpsertListener upsertListener, String arrayElementSeparator) {
+        this.conn = conn;
+        this.preparedStatement = preparedStatement;
+        this.upsertListener = upsertListener;
+        this.arrayElementSeparator = arrayElementSeparator;
+        this.dataTypes = Lists.newArrayList();
+        this.conversionFunctions = Lists.newArrayList();
+        for (ColumnInfo columnInfo : columnInfoList) {
+            PDataType dataType = PDataType.fromTypeId(columnInfo.getSqlType());
+            dataTypes.add(dataType);
+            conversionFunctions.add(createConversionFunction(dataType));
+        }
+    }
+
+    /**
+     * Execute upserts for each CSV record contained in the given iterable, notifying this
instance's
+     * {@code UpsertListener} for each completed upsert.
+     *
+     * @param csvRecords iterable of CSV records to be upserted
+     */
+    public void execute(Iterable<CSVRecord> csvRecords) {
+        for (CSVRecord csvRecord : csvRecords) {
+            execute(csvRecord);
+        }
+    }
+
+    /**
+     * Upsert a single record.
+     *
+     * @param csvRecord CSV record containing the data to be upserted
+     */
+    void execute(CSVRecord csvRecord) {
+        try {
+            for (int fieldIndex = 0; fieldIndex < conversionFunctions.size(); fieldIndex++)
{
+                Object sqlValue = conversionFunctions.get(fieldIndex).apply(csvRecord.get(fieldIndex));
+                if (sqlValue != null) {
+                    preparedStatement.setObject(fieldIndex + 1, sqlValue);
+                } else {
+                    preparedStatement.setNull(fieldIndex + 1, dataTypes.get(fieldIndex).getSqlType());
+                }
+            }
+            preparedStatement.execute();
+            upsertListener.upsertDone(++upsertCount);
+        } catch (Exception e) {
+            if (LOG.isDebugEnabled()) {
+                // Even though this is an error we only log it with debug logging because
we're notifying the
+                // listener, and it can do its own logging if needed
+                LOG.debug("Error on CSVRecord " + csvRecord, e);
+            }
+            upsertListener.errorOnRecord(csvRecord, e.getMessage());
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        try {
+            preparedStatement.close();
+        } catch (SQLException e) {
+            // An exception while closing the prepared statement is most likely a sign of
a real problem, so we don't
+            // want to hide it with closeQuietly or something similar
+            throw new RuntimeException(e);
+        }
+    }
+
+    private Function<String, Object> createConversionFunction(PDataType dataType) {
+        if (dataType.isArrayType()) {
+            return new ArrayDatatypeConversionFunction(
+                    new StringToArrayConverter(
+                            conn,
+                            arrayElementSeparator,
+                            PDataType.fromTypeId(dataType.getSqlType() - PDataType.ARRAY_TYPE_BASE)));
+        } else {
+            return new SimpleDatatypeConversionFunction(dataType);
+        }
+    }
+
+    /**
+     * Performs typed conversion from String values to a given column value type.
+     */
+    private static class SimpleDatatypeConversionFunction implements Function<String,
Object> {
+
+        private final PDataType dataType;
+
+        private SimpleDatatypeConversionFunction(PDataType dataType) {
+            this.dataType = dataType;
+        }
+
+        @Nullable
+        @Override
+        public Object apply(@Nullable String input) {
+            return dataType.toObject(input);
+        }
+    }
+
+    /**
+     * Converts string representations of arrays into Phoenix arrays of the correct type.
+     */
+    private static class ArrayDatatypeConversionFunction implements Function<String, Object>
{
+
+        private final StringToArrayConverter arrayConverter;
+
+        private ArrayDatatypeConversionFunction(StringToArrayConverter arrayConverter) {
+            this.arrayConverter = arrayConverter;
+        }
+
+        @Nullable
+        @Override
+        public Object apply(@Nullable String input) {
+            try {
+                return arrayConverter.toArray(input);
+            } catch (SQLException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/f15c7f1e/phoenix-core/src/main/java/org/apache/phoenix/util/csv/StringToArrayConverter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/csv/StringToArrayConverter.java
b/phoenix-core/src/main/java/org/apache/phoenix/util/csv/StringToArrayConverter.java
new file mode 100644
index 0000000..cfe2589
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/csv/StringToArrayConverter.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util.csv;
+
+import java.sql.Array;
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import javax.annotation.Nullable;
+
+import org.apache.phoenix.schema.PDataType;
+
+import com.google.common.base.Function;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+/**
+ * Converts strings with delimited values into Phoenix arrays.
+ */
+class StringToArrayConverter {
+
+    private final Splitter splitter;
+    private final Connection conn;
+    private final PDataType elementDataType;
+    private final ElementConvertFunction elementConvertFunction;
+
+    /**
+     * Instantiate with the array value separator and data type.
+     *
+     * @param conn Phoenix connection to target database
+     * @param separatorString string used to separate incoming array values in strings
+     * @param elementDataType datatype of the elements of arrays to be created
+     */
+    public StringToArrayConverter(Connection conn, String separatorString,
+            PDataType elementDataType) {
+        this.conn = conn;
+        this.splitter = Splitter.on(separatorString);
+        this.elementDataType = elementDataType;
+        this.elementConvertFunction = new ElementConvertFunction(elementDataType);
+    }
+
+    /**
+     * Convert an input delimited string into a phoenix array of the configured type.
+     *
+     * @param input string containing delimited array values
+     * @return the array containing the values represented in the input string
+     */
+    public Array toArray(String input) throws SQLException {
+        if (input == null || input.isEmpty()) {
+            return conn.createArrayOf(elementDataType.getSqlTypeName(), new Object[0]);
+        }
+        return conn.createArrayOf(
+                elementDataType.getSqlTypeName(),
+                Lists.newArrayList(
+                        Iterables.transform(
+                                splitter.split(input),
+                                elementConvertFunction)).toArray());
+    }
+
+    /**
+     * Converts incoming string values into their typed equivalent.
+     */
+    private static class ElementConvertFunction implements Function<String, Object>
{
+
+        private final PDataType pdataType;
+
+        private ElementConvertFunction(PDataType pdataType) {
+            this.pdataType = pdataType;
+        }
+
+        @Nullable
+        @Override
+        public Object apply(@Nullable String input) {
+            return pdataType.toObject(input);
+        }
+    }
+}


Mime
View raw message