Return-Path: X-Original-To: apmail-apex-dev-archive@minotaur.apache.org Delivered-To: apmail-apex-dev-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7450D18C95 for ; Mon, 4 Jan 2016 05:40:22 +0000 (UTC) Received: (qmail 56564 invoked by uid 500); 4 Jan 2016 05:40:22 -0000 Delivered-To: apmail-apex-dev-archive@apex.apache.org Received: (qmail 56502 invoked by uid 500); 4 Jan 2016 05:40:22 -0000 Mailing-List: contact dev-help@apex.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.incubator.apache.org Delivered-To: mailing list dev@apex.incubator.apache.org Received: (qmail 56491 invoked by uid 99); 4 Jan 2016 05:40:22 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 04 Jan 2016 05:40:22 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id A5B66C771F for ; Mon, 4 Jan 2016 05:40:21 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.98 X-Spam-Level: X-Spam-Status: No, score=0.98 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id PUKp7Srhk6IX for ; Mon, 4 Jan 2016 05:40:09 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with SMTP id 7F67723037 for ; Mon, 4 Jan 2016 05:40:09 +0000 (UTC) Received: (qmail 56449 invoked by uid 99); 4 Jan 2016 05:40:09 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 04 Jan 2016 05:40:09 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 507FADFDC5; Mon, 4 Jan 2016 05:40:09 +0000 (UTC) From: chinmaykolhatkar To: dev@apex.incubator.apache.org Reply-To: dev@apex.incubator.apache.org References: In-Reply-To: Subject: [GitHub] incubator-apex-malhar pull request: MLHR-1961 enhanced existing cs... Content-Type: text/plain Message-Id: <20160104054009.507FADFDC5@git1-us-west.apache.org> Date: Mon, 4 Jan 2016 05:40:09 +0000 (UTC) Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/incubator-apex-malhar/pull/154#discussion_r48705421 --- Diff: contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java --- @@ -62,247 +71,193 @@ * @since 3.2.0 */ @InterfaceStability.Evolving -public class CsvParser extends Parser +public class CsvParser extends Parser> { + /** + * Map Reader to read delimited records + */ + private transient CsvMapReader csvMapReader; + /** + * Bean Reader to read delimited records + */ + private transient CsvBeanReader csvBeanReader; + /** + * Reader used by csvMapReader and csvBeanReader + */ + private transient ReusableStringReader csvStringReader; - private ArrayList fields; - @NotNull - protected int fieldDelimiter; - protected String lineDelimiter; - + /** + * Contents of the schema + */ + private String schema; + /** + * Complete path where schema resides. + */ @NotNull - protected String fieldInfo; - - protected transient String[] nameMapping; - protected transient CellProcessor[] processors; - private transient CsvBeanReader csvReader; - - public enum FIELD_TYPE - { - BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE - }; + private transient String schemaPath; + /** + * Schema is read into this object to access fields + */ + private transient DelimitedSchema delimitedParserSchema; + /** + * Cell processors are an integral part of reading and writing with Super CSV + * they automate the data type conversions, and enforce constraints. + */ + private transient CellProcessor[] processors; + /** + * Names of all the fields in the same order of incoming records + */ + private transient String[] nameMapping; + /** + * header-this will be delimiter separated string of field names + */ + private transient String header; + /** + * Reading preferences that are passed through schema + */ + private transient CsvPreference preference; - @NotNull - private transient ReusableStringReader csvStringReader = new ReusableStringReader(); + @AutoMetric + long parsedOutputCount; - public CsvParser() + @Override + public void beginWindow(long windowId) { - fields = new ArrayList(); - fieldDelimiter = ','; - lineDelimiter = "\r\n"; + parsedOutputCount = 0; } @Override public void setup(OperatorContext context) { - super.setup(context); - - logger.info("field info {}", fieldInfo); - fields = new ArrayList(); - String[] fieldInfoTuple = fieldInfo.split(","); - for (int i = 0; i < fieldInfoTuple.length; i++) { - String[] fieldTuple = fieldInfoTuple[i].split(":"); - Field field = new Field(); - field.setName(fieldTuple[0]); - String[] typeFormat = fieldTuple[1].split("\\|"); - field.setType(typeFormat[0].toUpperCase()); - if (typeFormat.length > 1) { - field.setFormat(typeFormat[1]); - } - getFields().add(field); - } - - CsvPreference preference = new CsvPreference.Builder('"', fieldDelimiter, lineDelimiter).build(); - csvReader = new CsvBeanReader(csvStringReader, preference); - int countKeyValue = getFields().size(); - logger.info("countKeyValue {}", countKeyValue); - nameMapping = new String[countKeyValue]; - processors = new CellProcessor[countKeyValue]; - initialise(nameMapping, processors); + delimitedParserSchema = new DelimitedSchema(schema); + preference = new CsvPreference.Builder(delimitedParserSchema.getQuoteChar(), + delimitedParserSchema.getDelimiterChar(), delimitedParserSchema.getLineDelimiter()).build(); + nameMapping = delimitedParserSchema.getFieldNames().toArray( + new String[delimitedParserSchema.getFieldNames().size()]); + header = StringUtils.join(nameMapping, (char)delimitedParserSchema.getDelimiterChar() + ""); + processors = getProcessor(delimitedParserSchema.getFields()); + csvStringReader = new ReusableStringReader(); + csvMapReader = new CsvMapReader(csvStringReader, preference); + csvBeanReader = new CsvBeanReader(csvStringReader, preference); } - private void initialise(String[] nameMapping, CellProcessor[] processors) - { - for (int i = 0; i < getFields().size(); i++) { - FIELD_TYPE type = getFields().get(i).type; - nameMapping[i] = getFields().get(i).name; - if (type == FIELD_TYPE.DOUBLE) { - processors[i] = new Optional(new ParseDouble()); - } else if (type == FIELD_TYPE.INTEGER) { - processors[i] = new Optional(new ParseInt()); - } else if (type == FIELD_TYPE.FLOAT) { - processors[i] = new Optional(new ParseDouble()); - } else if (type == FIELD_TYPE.LONG) { - processors[i] = new Optional(new ParseLong()); - } else if (type == FIELD_TYPE.SHORT) { - processors[i] = new Optional(new ParseInt()); - } else if (type == FIELD_TYPE.STRING) { - processors[i] = new Optional(); - } else if (type == FIELD_TYPE.CHARACTER) { - processors[i] = new Optional(new ParseChar()); - } else if (type == FIELD_TYPE.BOOLEAN) { - processors[i] = new Optional(new ParseBool()); - } else if (type == FIELD_TYPE.DATE) { - String dateFormat = getFields().get(i).format; - processors[i] = new Optional(new ParseDate(dateFormat == null ? "dd/MM/yyyy" : dateFormat)); - } - } - } @Override - public Object convert(String tuple) + public Object convert(byte[] tuple) { - try { - csvStringReader.open(tuple); - return csvReader.read(clazz, nameMapping, processors); - } catch (IOException e) { - logger.debug("Error while converting tuple {} {}",tuple,e.getMessage()); - return null; - } + //This method is not invoked for CSV parser + return null; } @Override - public void teardown() + public void processTuple(byte[] tuple) --- End diff -- Lot of code duplicate here. Can you please create 2 methods, emitError and emitSuccess... Let these 2 methods emit on respective ports and also do other activities like increment the counter. This way the tasks are associated and never missed in any future code changes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastructure@apache.org or file a JIRA ticket with INFRA. ---