apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shubham-pathak22 <...@git.apache.org>
Subject [GitHub] incubator-apex-malhar pull request: MLHR-1961 enhanced existing cs...
Date Mon, 04 Jan 2016 09:46:59 GMT
Github user shubham-pathak22 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/154#discussion_r48715692
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java ---
    @@ -62,247 +71,193 @@
      * @since 3.2.0
      */
     @InterfaceStability.Evolving
    -public class CsvParser extends Parser<String, String>
    +public class CsvParser extends Parser<byte[], KeyValPair<String, String>>
     {
    +  /**
    +   * Map Reader to read delimited records
    +   */
    +  private transient CsvMapReader csvMapReader;
    +  /**
    +   * Bean Reader to read delimited records
    +   */
    +  private transient CsvBeanReader csvBeanReader;
    +  /**
    +   * Reader used by csvMapReader and csvBeanReader
    +   */
    +  private transient ReusableStringReader csvStringReader;
     
    -  private ArrayList<Field> fields;
    -  @NotNull
    -  protected int fieldDelimiter;
    -  protected String lineDelimiter;
    -
    +  /**
    +   * Contents of the schema
    +   */
    +  private String schema;
    +  /**
    +   * Complete path where schema resides.
    +   */
       @NotNull
    -  protected String fieldInfo;
    -
    -  protected transient String[] nameMapping;
    -  protected transient CellProcessor[] processors;
    -  private transient CsvBeanReader csvReader;
    -
    -  public enum FIELD_TYPE
    -  {
    -    BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
    -  };
    +  private transient String schemaPath;
    +  /**
    +   * Schema is read into this object to access fields
    +   */
    +  private transient DelimitedSchema delimitedParserSchema;
    +  /**
    +   * Cell processors are an integral part of reading and writing with Super CSV
    +   * they automate the data type conversions, and enforce constraints.
    +   */
    +  private transient CellProcessor[] processors;
    +  /**
    +   * Names of all the fields in the same order of incoming records
    +   */
    +  private transient String[] nameMapping;
    +  /**
    +   * header-this will be delimiter separated string of field names
    +   */
    +  private transient String header;
    +  /**
    +   * Reading preferences that are passed through schema
    +   */
    +  private transient CsvPreference preference;
     
    -  @NotNull
    -  private transient ReusableStringReader csvStringReader = new ReusableStringReader();
    +  @AutoMetric
    +  long parsedOutputCount;
     
    -  public CsvParser()
    +  @Override
    +  public void beginWindow(long windowId)
       {
    -    fields = new ArrayList<Field>();
    -    fieldDelimiter = ',';
    -    lineDelimiter = "\r\n";
    +    parsedOutputCount = 0;
       }
     
       @Override
       public void setup(OperatorContext context)
       {
    -    super.setup(context);
    -
    -    logger.info("field info {}", fieldInfo);
    -    fields = new ArrayList<Field>();
    -    String[] fieldInfoTuple = fieldInfo.split(",");
    -    for (int i = 0; i < fieldInfoTuple.length; i++) {
    -      String[] fieldTuple = fieldInfoTuple[i].split(":");
    -      Field field = new Field();
    -      field.setName(fieldTuple[0]);
    -      String[] typeFormat = fieldTuple[1].split("\\|");
    -      field.setType(typeFormat[0].toUpperCase());
    -      if (typeFormat.length > 1) {
    -        field.setFormat(typeFormat[1]);
    -      }
    -      getFields().add(field);
    -    }
    -
    -    CsvPreference preference = new CsvPreference.Builder('"', fieldDelimiter, lineDelimiter).build();
    -    csvReader = new CsvBeanReader(csvStringReader, preference);
    -    int countKeyValue = getFields().size();
    -    logger.info("countKeyValue {}", countKeyValue);
    -    nameMapping = new String[countKeyValue];
    -    processors = new CellProcessor[countKeyValue];
    -    initialise(nameMapping, processors);
    +    delimitedParserSchema = new DelimitedSchema(schema);
    +    preference = new CsvPreference.Builder(delimitedParserSchema.getQuoteChar(),
    +        delimitedParserSchema.getDelimiterChar(), delimitedParserSchema.getLineDelimiter()).build();
    +    nameMapping = delimitedParserSchema.getFieldNames().toArray(
    +        new String[delimitedParserSchema.getFieldNames().size()]);
    +    header = StringUtils.join(nameMapping, (char)delimitedParserSchema.getDelimiterChar()
+ "");
    +    processors = getProcessor(delimitedParserSchema.getFields());
    +    csvStringReader = new ReusableStringReader();
    +    csvMapReader = new CsvMapReader(csvStringReader, preference);
    +    csvBeanReader = new CsvBeanReader(csvStringReader, preference);
       }
     
    -  private void initialise(String[] nameMapping, CellProcessor[] processors)
    -  {
    -    for (int i = 0; i < getFields().size(); i++) {
    -      FIELD_TYPE type = getFields().get(i).type;
    -      nameMapping[i] = getFields().get(i).name;
    -      if (type == FIELD_TYPE.DOUBLE) {
    -        processors[i] = new Optional(new ParseDouble());
    -      } else if (type == FIELD_TYPE.INTEGER) {
    -        processors[i] = new Optional(new ParseInt());
    -      } else if (type == FIELD_TYPE.FLOAT) {
    -        processors[i] = new Optional(new ParseDouble());
    -      } else if (type == FIELD_TYPE.LONG) {
    -        processors[i] = new Optional(new ParseLong());
    -      } else if (type == FIELD_TYPE.SHORT) {
    -        processors[i] = new Optional(new ParseInt());
    -      } else if (type == FIELD_TYPE.STRING) {
    -        processors[i] = new Optional();
    -      } else if (type == FIELD_TYPE.CHARACTER) {
    -        processors[i] = new Optional(new ParseChar());
    -      } else if (type == FIELD_TYPE.BOOLEAN) {
    -        processors[i] = new Optional(new ParseBool());
    -      } else if (type == FIELD_TYPE.DATE) {
    -        String dateFormat = getFields().get(i).format;
    -        processors[i] = new Optional(new ParseDate(dateFormat == null ? "dd/MM/yyyy"
: dateFormat));
    -      }
    -    }
    -  }
       @Override
    -  public Object convert(String tuple)
    +  public Object convert(byte[] tuple)
       {
    -    try {
    -      csvStringReader.open(tuple);
    -      return csvReader.read(clazz, nameMapping, processors);
    -    } catch (IOException e) {
    -      logger.debug("Error while converting tuple {} {}",tuple,e.getMessage());
    -      return null;
    -    }
    +    //This method is not invoked for CSV parser
    +    return null;
       }
     
       @Override
    -  public void teardown()
    +  public void processTuple(byte[] tuple)
       {
    -    try {
    -      if (csvReader != null) {
    -        csvReader.close();
    +    if (tuple == null) {
    +      if (err.isConnected()) {
    +        err.emit(new KeyValPair<String, String>(null, "Blank/null tuple"));
           }
    -    } catch (IOException e) {
    -      DTThrowable.rethrow(e);
    -    }
    -  }
    -
    -  @Override
    -  public String processErorrTuple(String input)
    -  {
    -    return input;
    -  }
    -
    -  public static class Field
    -  {
    -    String name;
    -    String format;
    -    FIELD_TYPE type;
    -
    -    public String getName()
    -    {
    -      return name;
    +      errorTupleCount++;
    +      return;
         }
    -
    -    public void setName(String name)
    -    {
    -      this.name = name;
    -    }
    -
    -    public FIELD_TYPE getType()
    -    {
    -      return type;
    -    }
    -
    -    public void setType(String type)
    -    {
    -      this.type = FIELD_TYPE.valueOf(type);
    +    String incomingString = new String(tuple);
    +    if (StringUtils.isBlank(incomingString) || StringUtils.equals(incomingString, header))
{
    +      if (err.isConnected()) {
    +        err.emit(new KeyValPair<String, String>(incomingString, "Blank/header tuple"));
    +      }
    +      errorTupleCount++;
    +      return;
         }
    +    try {
    +      if (parsedOutput.isConnected()) {
    +        csvStringReader.open(incomingString);
    +        Map<String, Object> map = csvMapReader.read(nameMapping, processors);
    +        parsedOutput.emit(map);
    +        parsedOutputCount++;
    +      }
     
    -    public String getFormat()
    -    {
    -      return format;
    -    }
    +      if (out.isConnected() && clazz != null) {
    +        csvStringReader.open(incomingString);
    +        Object obj = csvBeanReader.read(clazz, nameMapping, processors);
    +        out.emit(obj);
    +        emittedObjectCount++;
    +      }
     
    -    public void setFormat(String format)
    -    {
    -      this.format = format;
    +    } catch (SuperCsvException e) {
    +      if (err.isConnected()) {
    +        err.emit(new KeyValPair<String, String>(incomingString, e.getMessage()));
    +      }
    +      errorTupleCount++;
    +      logger.error("Tuple could not be parsed. Reason {}", e.getMessage());
    +    } catch (IOException e) {
    +      logger.error("Exception in process method {}", e.getMessage());
    +      DTThrowable.rethrow(e);
         }
    -
    -  }
    -
    -  /**
    -   * Gets the array list of the fields, a field being a POJO containing the name
    -   * of the field and type of field.
    -   * 
    -   * @return An array list of Fields.
    -   */
    -  public ArrayList<Field> getFields()
    -  {
    -    return fields;
    -  }
    -
    -  /**
    -   * Sets the array list of the fields, a field being a POJO containing the name
    -   * of the field and type of field.
    -   * 
    -   * @param fields
    -   *          An array list of Fields.
    -   */
    -  public void setFields(ArrayList<Field> fields)
    -  {
    -    this.fields = fields;
       }
     
    -  /**
    -   * Gets the delimiter which separates fields in incoming data.
    -   * 
    -   * @return fieldDelimiter
    -   */
    -  public int getFieldDelimiter()
    +  @Override
    +  public KeyValPair<String, String> processErorrTuple(byte[] input)
       {
    -    return fieldDelimiter;
    +    //This method is not invoked for CSV parser
    +    return null;
       }
     
       /**
    -   * Sets the delimiter which separates fields in incoming data.
    -   * 
    -   * @param fieldDelimiter
    +   * Returns array of cellprocessors, one for each field
        */
    -  public void setFieldDelimiter(int fieldDelimiter)
    +  private CellProcessor[] getProcessor(List<Field> fields)
       {
    -    this.fieldDelimiter = fieldDelimiter;
    +    CellProcessor[] processor = new CellProcessor[fields.size()];
    +    int fieldCount = 0;
    +    for (Field field : fields) {
    +      processor[fieldCount++] = CellProcessorBuilder.getCellProcessor(field.getType(),
field.getConstraints());
    +    }
    +    return processor;
       }
     
    -  /**
    -   * Gets the delimiter which separates lines in incoming data.
    -   * 
    -   * @return lineDelimiter
    -   */
    -  public String getLineDelimiter()
    +  @Override
    +  public void teardown()
       {
    -    return lineDelimiter;
    +    try {
    +      csvMapReader.close();
    +    } catch (IOException e) {
    +      logger.error("Error while closing csv map reader {}", e.getMessage());
    +    }
    +    try {
    +      csvBeanReader.close();
    +    } catch (IOException e) {
    +      logger.error("Error while closing csv bean reader {}", e.getMessage());
    +    }
       }
     
       /**
    -   * Sets the delimiter which separates line in incoming data.
    +   * Complete hdfs path of schema
        * 
    -   * @param lineDelimiter
    +   * @return
        */
    -  public void setLineDelimiter(String lineDelimiter)
    +  public String getSchemaPath()
       {
    -    this.lineDelimiter = lineDelimiter;
    +    return schemaPath;
       }
     
       /**
    -   * Gets the name of the fields with type and format ( for date ) as comma
    -   * separated string in same order as incoming data. e.g
    -   * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy
    +   * Set path of schema
        * 
    -   * @return fieldInfo
    +   * @param schemaPath
    +   *          path of the schema file in hdfs
        */
    -  public String getFieldInfo()
    +  public void setSchemaPath(String schemaPath)
    --- End diff --
    
    Good suggestion. Will make necessary changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message