apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chinmaykolhatkar <...@git.apache.org>
Subject [GitHub] incubator-apex-malhar pull request: MLHR-1961 enhanced existing cs...
Date Mon, 04 Jan 2016 05:33:04 GMT
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/154#discussion_r48705255
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java ---
    @@ -62,247 +71,193 @@
      * @since 3.2.0
      */
     @InterfaceStability.Evolving
    -public class CsvParser extends Parser<String, String>
    +public class CsvParser extends Parser<byte[], KeyValPair<String, String>>
     {
    +  /**
    +   * Map Reader to read delimited records
    +   */
    +  private transient CsvMapReader csvMapReader;
    +  /**
    +   * Bean Reader to read delimited records
    +   */
    +  private transient CsvBeanReader csvBeanReader;
    +  /**
    +   * Reader used by csvMapReader and csvBeanReader
    +   */
    +  private transient ReusableStringReader csvStringReader;
     
    -  private ArrayList<Field> fields;
    -  @NotNull
    -  protected int fieldDelimiter;
    -  protected String lineDelimiter;
    -
    +  /**
    +   * Contents of the schema
    +   */
    +  private String schema;
    +  /**
    +   * Complete path where schema resides.
    +   */
       @NotNull
    -  protected String fieldInfo;
    -
    -  protected transient String[] nameMapping;
    -  protected transient CellProcessor[] processors;
    -  private transient CsvBeanReader csvReader;
    -
    -  public enum FIELD_TYPE
    -  {
    -    BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
    -  };
    +  private transient String schemaPath;
    +  /**
    +   * Schema is read into this object to access fields
    +   */
    +  private transient DelimitedSchema delimitedParserSchema;
    +  /**
    +   * Cell processors are an integral part of reading and writing with Super CSV
    +   * they automate the data type conversions, and enforce constraints.
    +   */
    +  private transient CellProcessor[] processors;
    +  /**
    +   * Names of all the fields in the same order of incoming records
    +   */
    +  private transient String[] nameMapping;
    +  /**
    +   * header-this will be delimiter separated string of field names
    +   */
    +  private transient String header;
    +  /**
    +   * Reading preferences that are passed through schema
    +   */
    +  private transient CsvPreference preference;
     
    -  @NotNull
    -  private transient ReusableStringReader csvStringReader = new ReusableStringReader();
    +  @AutoMetric
    +  long parsedOutputCount;
     
    -  public CsvParser()
    +  @Override
    +  public void beginWindow(long windowId)
       {
    -    fields = new ArrayList<Field>();
    -    fieldDelimiter = ',';
    -    lineDelimiter = "\r\n";
    +    parsedOutputCount = 0;
       }
     
       @Override
       public void setup(OperatorContext context)
       {
    -    super.setup(context);
    -
    -    logger.info("field info {}", fieldInfo);
    -    fields = new ArrayList<Field>();
    -    String[] fieldInfoTuple = fieldInfo.split(",");
    -    for (int i = 0; i < fieldInfoTuple.length; i++) {
    -      String[] fieldTuple = fieldInfoTuple[i].split(":");
    -      Field field = new Field();
    -      field.setName(fieldTuple[0]);
    -      String[] typeFormat = fieldTuple[1].split("\\|");
    -      field.setType(typeFormat[0].toUpperCase());
    -      if (typeFormat.length > 1) {
    -        field.setFormat(typeFormat[1]);
    -      }
    -      getFields().add(field);
    -    }
    -
    -    CsvPreference preference = new CsvPreference.Builder('"', fieldDelimiter, lineDelimiter).build();
    -    csvReader = new CsvBeanReader(csvStringReader, preference);
    -    int countKeyValue = getFields().size();
    -    logger.info("countKeyValue {}", countKeyValue);
    -    nameMapping = new String[countKeyValue];
    -    processors = new CellProcessor[countKeyValue];
    -    initialise(nameMapping, processors);
    +    delimitedParserSchema = new DelimitedSchema(schema);
    +    preference = new CsvPreference.Builder(delimitedParserSchema.getQuoteChar(),
    +        delimitedParserSchema.getDelimiterChar(), delimitedParserSchema.getLineDelimiter()).build();
    +    nameMapping = delimitedParserSchema.getFieldNames().toArray(
    +        new String[delimitedParserSchema.getFieldNames().size()]);
    +    header = StringUtils.join(nameMapping, (char)delimitedParserSchema.getDelimiterChar()
+ "");
    +    processors = getProcessor(delimitedParserSchema.getFields());
    +    csvStringReader = new ReusableStringReader();
    +    csvMapReader = new CsvMapReader(csvStringReader, preference);
    +    csvBeanReader = new CsvBeanReader(csvStringReader, preference);
       }
     
    -  private void initialise(String[] nameMapping, CellProcessor[] processors)
    -  {
    -    for (int i = 0; i < getFields().size(); i++) {
    -      FIELD_TYPE type = getFields().get(i).type;
    -      nameMapping[i] = getFields().get(i).name;
    -      if (type == FIELD_TYPE.DOUBLE) {
    -        processors[i] = new Optional(new ParseDouble());
    -      } else if (type == FIELD_TYPE.INTEGER) {
    -        processors[i] = new Optional(new ParseInt());
    -      } else if (type == FIELD_TYPE.FLOAT) {
    -        processors[i] = new Optional(new ParseDouble());
    -      } else if (type == FIELD_TYPE.LONG) {
    -        processors[i] = new Optional(new ParseLong());
    -      } else if (type == FIELD_TYPE.SHORT) {
    -        processors[i] = new Optional(new ParseInt());
    -      } else if (type == FIELD_TYPE.STRING) {
    -        processors[i] = new Optional();
    -      } else if (type == FIELD_TYPE.CHARACTER) {
    -        processors[i] = new Optional(new ParseChar());
    -      } else if (type == FIELD_TYPE.BOOLEAN) {
    -        processors[i] = new Optional(new ParseBool());
    -      } else if (type == FIELD_TYPE.DATE) {
    -        String dateFormat = getFields().get(i).format;
    -        processors[i] = new Optional(new ParseDate(dateFormat == null ? "dd/MM/yyyy"
: dateFormat));
    -      }
    -    }
    -  }
       @Override
    -  public Object convert(String tuple)
    +  public Object convert(byte[] tuple)
       {
    -    try {
    -      csvStringReader.open(tuple);
    -      return csvReader.read(clazz, nameMapping, processors);
    -    } catch (IOException e) {
    -      logger.debug("Error while converting tuple {} {}",tuple,e.getMessage());
    -      return null;
    -    }
    +    //This method is not invoked for CSV parser
    +    return null;
       }
     
       @Override
    -  public void teardown()
    +  public void processTuple(byte[] tuple)
       {
    -    try {
    -      if (csvReader != null) {
    -        csvReader.close();
    +    if (tuple == null) {
    +      if (err.isConnected()) {
    --- End diff --
    
    If condition of err.isConnected is not required when action is just to emit.
    
    By default its connected to Sink.BLACKHOLE on which put object does not do anything.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message