Return-Path: X-Original-To: apmail-apex-dev-archive@minotaur.apache.org Delivered-To: apmail-apex-dev-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2A36F18475 for ; Mon, 4 Jan 2016 09:47:09 +0000 (UTC) Received: (qmail 75194 invoked by uid 500); 4 Jan 2016 09:47:09 -0000 Delivered-To: apmail-apex-dev-archive@apex.apache.org Received: (qmail 75131 invoked by uid 500); 4 Jan 2016 09:47:09 -0000 Mailing-List: contact dev-help@apex.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.incubator.apache.org Delivered-To: mailing list dev@apex.incubator.apache.org Received: (qmail 75120 invoked by uid 99); 4 Jan 2016 09:47:08 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 04 Jan 2016 09:47:08 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 5A3FFC0B5F for ; Mon, 4 Jan 2016 09:47:08 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.427 X-Spam-Level: X-Spam-Status: No, score=0.427 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.554, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id UJKpyQSuevnl for ; Mon, 4 Jan 2016 09:46:59 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with SMTP id 462E320D53 for ; Mon, 4 Jan 2016 09:46:59 +0000 (UTC) Received: (qmail 74760 invoked by uid 99); 4 Jan 2016 09:46:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 04 Jan 2016 09:46:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0706FDFFB9; Mon, 4 Jan 2016 09:46:59 +0000 (UTC) From: shubham-pathak22 To: dev@apex.incubator.apache.org Reply-To: dev@apex.incubator.apache.org References: In-Reply-To: Subject: [GitHub] incubator-apex-malhar pull request: MLHR-1961 enhanced existing cs... Content-Type: text/plain Message-Id: <20160104094659.0706FDFFB9@git1-us-west.apache.org> Date: Mon, 4 Jan 2016 09:46:59 +0000 (UTC) Github user shubham-pathak22 commented on a diff in the pull request: https://github.com/apache/incubator-apex-malhar/pull/154#discussion_r48715692 --- Diff: contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java --- @@ -62,247 +71,193 @@ * @since 3.2.0 */ @InterfaceStability.Evolving -public class CsvParser extends Parser +public class CsvParser extends Parser> { + /** + * Map Reader to read delimited records + */ + private transient CsvMapReader csvMapReader; + /** + * Bean Reader to read delimited records + */ + private transient CsvBeanReader csvBeanReader; + /** + * Reader used by csvMapReader and csvBeanReader + */ + private transient ReusableStringReader csvStringReader; - private ArrayList fields; - @NotNull - protected int fieldDelimiter; - protected String lineDelimiter; - + /** + * Contents of the schema + */ + private String schema; + /** + * Complete path where schema resides. + */ @NotNull - protected String fieldInfo; - - protected transient String[] nameMapping; - protected transient CellProcessor[] processors; - private transient CsvBeanReader csvReader; - - public enum FIELD_TYPE - { - BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE - }; + private transient String schemaPath; + /** + * Schema is read into this object to access fields + */ + private transient DelimitedSchema delimitedParserSchema; + /** + * Cell processors are an integral part of reading and writing with Super CSV + * they automate the data type conversions, and enforce constraints. + */ + private transient CellProcessor[] processors; + /** + * Names of all the fields in the same order of incoming records + */ + private transient String[] nameMapping; + /** + * header-this will be delimiter separated string of field names + */ + private transient String header; + /** + * Reading preferences that are passed through schema + */ + private transient CsvPreference preference; - @NotNull - private transient ReusableStringReader csvStringReader = new ReusableStringReader(); + @AutoMetric + long parsedOutputCount; - public CsvParser() + @Override + public void beginWindow(long windowId) { - fields = new ArrayList(); - fieldDelimiter = ','; - lineDelimiter = "\r\n"; + parsedOutputCount = 0; } @Override public void setup(OperatorContext context) { - super.setup(context); - - logger.info("field info {}", fieldInfo); - fields = new ArrayList(); - String[] fieldInfoTuple = fieldInfo.split(","); - for (int i = 0; i < fieldInfoTuple.length; i++) { - String[] fieldTuple = fieldInfoTuple[i].split(":"); - Field field = new Field(); - field.setName(fieldTuple[0]); - String[] typeFormat = fieldTuple[1].split("\\|"); - field.setType(typeFormat[0].toUpperCase()); - if (typeFormat.length > 1) { - field.setFormat(typeFormat[1]); - } - getFields().add(field); - } - - CsvPreference preference = new CsvPreference.Builder('"', fieldDelimiter, lineDelimiter).build(); - csvReader = new CsvBeanReader(csvStringReader, preference); - int countKeyValue = getFields().size(); - logger.info("countKeyValue {}", countKeyValue); - nameMapping = new String[countKeyValue]; - processors = new CellProcessor[countKeyValue]; - initialise(nameMapping, processors); + delimitedParserSchema = new DelimitedSchema(schema); + preference = new CsvPreference.Builder(delimitedParserSchema.getQuoteChar(), + delimitedParserSchema.getDelimiterChar(), delimitedParserSchema.getLineDelimiter()).build(); + nameMapping = delimitedParserSchema.getFieldNames().toArray( + new String[delimitedParserSchema.getFieldNames().size()]); + header = StringUtils.join(nameMapping, (char)delimitedParserSchema.getDelimiterChar() + ""); + processors = getProcessor(delimitedParserSchema.getFields()); + csvStringReader = new ReusableStringReader(); + csvMapReader = new CsvMapReader(csvStringReader, preference); + csvBeanReader = new CsvBeanReader(csvStringReader, preference); } - private void initialise(String[] nameMapping, CellProcessor[] processors) - { - for (int i = 0; i < getFields().size(); i++) { - FIELD_TYPE type = getFields().get(i).type; - nameMapping[i] = getFields().get(i).name; - if (type == FIELD_TYPE.DOUBLE) { - processors[i] = new Optional(new ParseDouble()); - } else if (type == FIELD_TYPE.INTEGER) { - processors[i] = new Optional(new ParseInt()); - } else if (type == FIELD_TYPE.FLOAT) { - processors[i] = new Optional(new ParseDouble()); - } else if (type == FIELD_TYPE.LONG) { - processors[i] = new Optional(new ParseLong()); - } else if (type == FIELD_TYPE.SHORT) { - processors[i] = new Optional(new ParseInt()); - } else if (type == FIELD_TYPE.STRING) { - processors[i] = new Optional(); - } else if (type == FIELD_TYPE.CHARACTER) { - processors[i] = new Optional(new ParseChar()); - } else if (type == FIELD_TYPE.BOOLEAN) { - processors[i] = new Optional(new ParseBool()); - } else if (type == FIELD_TYPE.DATE) { - String dateFormat = getFields().get(i).format; - processors[i] = new Optional(new ParseDate(dateFormat == null ? "dd/MM/yyyy" : dateFormat)); - } - } - } @Override - public Object convert(String tuple) + public Object convert(byte[] tuple) { - try { - csvStringReader.open(tuple); - return csvReader.read(clazz, nameMapping, processors); - } catch (IOException e) { - logger.debug("Error while converting tuple {} {}",tuple,e.getMessage()); - return null; - } + //This method is not invoked for CSV parser + return null; } @Override - public void teardown() + public void processTuple(byte[] tuple) { - try { - if (csvReader != null) { - csvReader.close(); + if (tuple == null) { + if (err.isConnected()) { + err.emit(new KeyValPair(null, "Blank/null tuple")); } - } catch (IOException e) { - DTThrowable.rethrow(e); - } - } - - @Override - public String processErorrTuple(String input) - { - return input; - } - - public static class Field - { - String name; - String format; - FIELD_TYPE type; - - public String getName() - { - return name; + errorTupleCount++; + return; } - - public void setName(String name) - { - this.name = name; - } - - public FIELD_TYPE getType() - { - return type; - } - - public void setType(String type) - { - this.type = FIELD_TYPE.valueOf(type); + String incomingString = new String(tuple); + if (StringUtils.isBlank(incomingString) || StringUtils.equals(incomingString, header)) { + if (err.isConnected()) { + err.emit(new KeyValPair(incomingString, "Blank/header tuple")); + } + errorTupleCount++; + return; } + try { + if (parsedOutput.isConnected()) { + csvStringReader.open(incomingString); + Map map = csvMapReader.read(nameMapping, processors); + parsedOutput.emit(map); + parsedOutputCount++; + } - public String getFormat() - { - return format; - } + if (out.isConnected() && clazz != null) { + csvStringReader.open(incomingString); + Object obj = csvBeanReader.read(clazz, nameMapping, processors); + out.emit(obj); + emittedObjectCount++; + } - public void setFormat(String format) - { - this.format = format; + } catch (SuperCsvException e) { + if (err.isConnected()) { + err.emit(new KeyValPair(incomingString, e.getMessage())); + } + errorTupleCount++; + logger.error("Tuple could not be parsed. Reason {}", e.getMessage()); + } catch (IOException e) { + logger.error("Exception in process method {}", e.getMessage()); + DTThrowable.rethrow(e); } - - } - - /** - * Gets the array list of the fields, a field being a POJO containing the name - * of the field and type of field. - * - * @return An array list of Fields. - */ - public ArrayList getFields() - { - return fields; - } - - /** - * Sets the array list of the fields, a field being a POJO containing the name - * of the field and type of field. - * - * @param fields - * An array list of Fields. - */ - public void setFields(ArrayList fields) - { - this.fields = fields; } - /** - * Gets the delimiter which separates fields in incoming data. - * - * @return fieldDelimiter - */ - public int getFieldDelimiter() + @Override + public KeyValPair processErorrTuple(byte[] input) { - return fieldDelimiter; + //This method is not invoked for CSV parser + return null; } /** - * Sets the delimiter which separates fields in incoming data. - * - * @param fieldDelimiter + * Returns array of cellprocessors, one for each field */ - public void setFieldDelimiter(int fieldDelimiter) + private CellProcessor[] getProcessor(List fields) { - this.fieldDelimiter = fieldDelimiter; + CellProcessor[] processor = new CellProcessor[fields.size()]; + int fieldCount = 0; + for (Field field : fields) { + processor[fieldCount++] = CellProcessorBuilder.getCellProcessor(field.getType(), field.getConstraints()); + } + return processor; } - /** - * Gets the delimiter which separates lines in incoming data. - * - * @return lineDelimiter - */ - public String getLineDelimiter() + @Override + public void teardown() { - return lineDelimiter; + try { + csvMapReader.close(); + } catch (IOException e) { + logger.error("Error while closing csv map reader {}", e.getMessage()); + } + try { + csvBeanReader.close(); + } catch (IOException e) { + logger.error("Error while closing csv bean reader {}", e.getMessage()); + } } /** - * Sets the delimiter which separates line in incoming data. + * Complete hdfs path of schema * - * @param lineDelimiter + * @return */ - public void setLineDelimiter(String lineDelimiter) + public String getSchemaPath() { - this.lineDelimiter = lineDelimiter; + return schemaPath; } /** - * Gets the name of the fields with type and format ( for date ) as comma - * separated string in same order as incoming data. e.g - * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy + * Set path of schema * - * @return fieldInfo + * @param schemaPath + * path of the schema file in hdfs */ - public String getFieldInfo() + public void setSchemaPath(String schemaPath) --- End diff -- Good suggestion. Will make necessary changes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastructure@apache.org or file a JIRA ticket with INFRA. ---