apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (APEXMALHAR-2105) Enhance CSV Formatter to take in schema similar to Csv Parser
Date Wed, 01 Jun 2016 18:05:59 GMT

    [ https://issues.apache.org/jira/browse/APEXMALHAR-2105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15310784#comment-15310784
] 

ASF GitHub Bot commented on APEXMALHAR-2105:
--------------------------------------------

Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/306#discussion_r65412319
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java ---
    @@ -58,215 +68,156 @@
     public class CsvFormatter extends Formatter<String>
     {
     
    -  private ArrayList<Field> fields;
    -  @NotNull
    -  protected String classname;
    -  @NotNull
    -  protected int fieldDelimiter;
    -  protected String lineDelimiter;
    +  /**
    +   * Names of all the fields in the same order that would appear in output
    +   * records
    +   */
    +  private transient String[] nameMapping;
    +  /**
    +   * Cell processors are an integral part of reading and writing with Super CSV
    +   * they automate the data type conversions, and enforce constraints.
    +   */
    +  private transient CellProcessor[] processors;
    +  /**
    +   * Writing preferences that are passed through schema
    +   */
    +  private transient CsvPreference preference;
     
    +  /**
    +   * Contents of the schema.Schema is specified in a json format as per
    +   * {@link DelimitedSchema}
    +   */
       @NotNull
    -  protected String fieldInfo;
    +  private String schema;
    +  /**
    +   * Schema is read into this object to access fields
    +   */
    +  private transient DelimitedSchema delimitedParserSchema;
     
    -  public enum FIELD_TYPE
    -  {
    -    BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
    -  };
    +  /**
    +   * metric to keep count of number of tuples emitted on error port port
    +   */
    +  @AutoMetric
    +  protected long errorTupleCount;
    +
    +  /**
    +   * metric to keep count of number of tuples emitted on out port
    +   */
    +  @AutoMetric
    +  protected long emittedObjectCount;
     
    -  protected transient String[] nameMapping;
    -  protected transient CellProcessor[] processors;
    -  protected transient CsvPreference preference;
    +  /**
    +   * metric to keep count of number of tuples emitted on input port
    +   */
    +  @AutoMetric
    +  protected long incomingTuplesCount;
     
       public CsvFormatter()
       {
    -    fields = new ArrayList<Field>();
    -    fieldDelimiter = ',';
    -    lineDelimiter = "\r\n";
    +  }
     
    +  @Override
    +  public void beginWindow(long windowId)
    +  {
    +    super.beginWindow(windowId);
    +    errorTupleCount = 0;
    +    emittedObjectCount = 0;
    +    incomingTuplesCount = 0;
       }
     
       @Override
       public void setup(Context.OperatorContext context)
       {
         super.setup(context);
    -
    -    //fieldInfo information
    -    fields = new ArrayList<Field>();
    -    String[] fieldInfoTuple = fieldInfo.split(",");
    -    for (int i = 0; i < fieldInfoTuple.length; i++) {
    -      String[] fieldTuple = fieldInfoTuple[i].split(":");
    -      Field field = new Field();
    -      field.setName(fieldTuple[0]);
    -      String[] typeFormat = fieldTuple[1].split("\\|");
    -      field.setType(typeFormat[0].toUpperCase());
    -      if (typeFormat.length > 1) {
    -        field.setFormat(typeFormat[1]);
    -      }
    -      getFields().add(field);
    -    }
    -    preference = new CsvPreference.Builder('"', fieldDelimiter, lineDelimiter).build();
    -    int countKeyValue = getFields().size();
    -    nameMapping = new String[countKeyValue];
    -    processors = new CellProcessor[countKeyValue];
    -    initialise(nameMapping, processors);
    -
    +    delimitedParserSchema = new DelimitedSchema(schema);
    +    preference = new CsvPreference.Builder(delimitedParserSchema.getQuoteChar(),
    +        delimitedParserSchema.getDelimiterChar(), delimitedParserSchema.getLineDelimiter()).build();
    +    nameMapping = delimitedParserSchema.getFieldNames()
    +        .toArray(new String[delimitedParserSchema.getFieldNames().size()]);
    +    processors = getProcessor(delimitedParserSchema.getFields());
       }
     
    -  private void initialise(String[] nameMapping, CellProcessor[] processors)
    +  /**
    +   * Returns array of cellprocessors, one for each field
    +   */
    +  private CellProcessor[] getProcessor(List<Field> fields)
       {
    -    for (int i = 0; i < getFields().size(); i++) {
    -      FIELD_TYPE type = getFields().get(i).type;
    -      nameMapping[i] = getFields().get(i).name;
    -      if (type == FIELD_TYPE.DATE) {
    -        String dateFormat = getFields().get(i).format;
    -        processors[i] = new Optional(new FmtDate(dateFormat == null ? "dd/MM/yyyy" :
dateFormat));
    +    CellProcessor[] processor = new CellProcessor[fields.size()];
    +    int fieldCount = 0;
    +    for (Field field : fields) {
    +      if (field.getType() == FieldType.DATE) {
    +        String format = field.getConstraints().get(DelimitedSchema.DATE_FORMAT) == null
? null
    +            : (String)field.getConstraints().get(DelimitedSchema.DATE_FORMAT);
    +        processor[fieldCount++] = new Optional(new FmtDate(format == null ? "dd/MM/yyyy"
: format));
           } else {
    -        processors[i] = new Optional();
    +        processor[fieldCount++] = new Optional();
           }
         }
    -
    +    return processor;
       }
     
       @Override
       public String convert(Object tuple)
       {
    +    incomingTuplesCount++;
    +    if (tuple == null) {
    +      errorTupleCount++;
    +      logger.error(" Null tuple", tuple);
    +      return null;
    +    }
         try {
           StringWriter stringWriter = new StringWriter();
           ICsvBeanWriter beanWriter = new CsvBeanWriter(stringWriter, preference);
           beanWriter.write(tuple, nameMapping, processors);
           beanWriter.flush();
           beanWriter.close();
    +      emittedObjectCount++;
           return stringWriter.toString();
         } catch (SuperCsvException e) {
    -      logger.debug("Error while converting tuple {} {}",tuple,e.getMessage());
    +      logger.error("Error while converting tuple {} {}", tuple, e.getMessage());
    +      errorTupleCount++;
         } catch (IOException e) {
           DTThrowable.rethrow(e);
         }
         return null;
       }
     
    -  public static class Field
    -  {
    -    String name;
    -    String format;
    -    FIELD_TYPE type;
    -
    -    public String getName()
    -    {
    -      return name;
    -    }
    -
    -    public void setName(String name)
    -    {
    -      this.name = name;
    -    }
    -
    -    public FIELD_TYPE getType()
    -    {
    -      return type;
    -    }
    -
    -    public void setType(String type)
    -    {
    -      this.type = FIELD_TYPE.valueOf(type);
    -    }
    -
    -    public String getFormat()
    -    {
    -      return format;
    -    }
    -
    -    public void setFormat(String format)
    -    {
    -      this.format = format;
    -    }
    -  }
    -
       /**
    -   * Gets the array list of the fields, a field being a POJO containing the name
    -   * of the field and type of field.
    +   * Get the schema
        * 
    -   * @return An array list of Fields.
    +   * @return schema
        */
    -  public ArrayList<Field> getFields()
    +  public String getSchema()
       {
    -    return fields;
    +    return schema;
       }
     
       /**
    -   * Sets the array list of the fields, a field being a POJO containing the name
    -   * of the field and type of field.
    +   * Set the schema
        * 
    -   * @param fields
    -   *          An array list of Fields.
    +   * @param schema
        */
    -  public void setFields(ArrayList<Field> fields)
    +  public void setSchema(String schema)
       {
    -    this.fields = fields;
    +    this.schema = schema;
       }
     
    -  /**
    -   * Gets the delimiter which separates fields in incoming data.
    -   * 
    -   * @return fieldDelimiter
    -   */
    -  public int getFieldDelimiter()
    +  @VisibleForTesting
    +  public long getErrorTupleCount()
       {
    -    return fieldDelimiter;
    +    return errorTupleCount;
       }
     
    -  /**
    -   * Sets the delimiter which separates fields in incoming data.
    -   * 
    -   * @param fieldDelimiter
    -   */
    -  public void setFieldDelimiter(int fieldDelimiter)
    +  @VisibleForTesting
    +  public long getEmittedObjectCount()
    --- End diff --
    
    You can make this protected? As the testcase will be in the same package, it'll be accesible
via protected as well.


> Enhance CSV Formatter to take in schema similar to Csv Parser
> -------------------------------------------------------------
>
>                 Key: APEXMALHAR-2105
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2105
>             Project: Apache Apex Malhar
>          Issue Type: Improvement
>            Reporter: shubham pathak
>
> CSV Parser takes in a  schema that specifies field names and constraints. CSV Formatter
also needs same information, but in the current implementation , it takes it as "fieldInfo".
 Enhancing CSV Formatter to support the same schema as CSV Parser would make it simpler for
the end user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message