apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chinmaykolhatkar <...@git.apache.org>
Subject [GitHub] incubator-apex-malhar pull request #306: APEXMALHAR-2105 enhancing CSV forma...
Date Wed, 01 Jun 2016 18:05:30 GMT
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/306#discussion_r65412319
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java ---
    @@ -58,215 +68,156 @@
     public class CsvFormatter extends Formatter<String>
     {
     
    -  private ArrayList<Field> fields;
    -  @NotNull
    -  protected String classname;
    -  @NotNull
    -  protected int fieldDelimiter;
    -  protected String lineDelimiter;
    +  /**
    +   * Names of all the fields in the same order that would appear in output
    +   * records
    +   */
    +  private transient String[] nameMapping;
    +  /**
    +   * Cell processors are an integral part of reading and writing with Super CSV
    +   * they automate the data type conversions, and enforce constraints.
    +   */
    +  private transient CellProcessor[] processors;
    +  /**
    +   * Writing preferences that are passed through schema
    +   */
    +  private transient CsvPreference preference;
     
    +  /**
    +   * Contents of the schema.Schema is specified in a json format as per
    +   * {@link DelimitedSchema}
    +   */
       @NotNull
    -  protected String fieldInfo;
    +  private String schema;
    +  /**
    +   * Schema is read into this object to access fields
    +   */
    +  private transient DelimitedSchema delimitedParserSchema;
     
    -  public enum FIELD_TYPE
    -  {
    -    BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
    -  };
    +  /**
    +   * metric to keep count of number of tuples emitted on error port port
    +   */
    +  @AutoMetric
    +  protected long errorTupleCount;
    +
    +  /**
    +   * metric to keep count of number of tuples emitted on out port
    +   */
    +  @AutoMetric
    +  protected long emittedObjectCount;
     
    -  protected transient String[] nameMapping;
    -  protected transient CellProcessor[] processors;
    -  protected transient CsvPreference preference;
    +  /**
    +   * metric to keep count of number of tuples emitted on input port
    +   */
    +  @AutoMetric
    +  protected long incomingTuplesCount;
     
       public CsvFormatter()
       {
    -    fields = new ArrayList<Field>();
    -    fieldDelimiter = ',';
    -    lineDelimiter = "\r\n";
    +  }
     
    +  @Override
    +  public void beginWindow(long windowId)
    +  {
    +    super.beginWindow(windowId);
    +    errorTupleCount = 0;
    +    emittedObjectCount = 0;
    +    incomingTuplesCount = 0;
       }
     
       @Override
       public void setup(Context.OperatorContext context)
       {
         super.setup(context);
    -
    -    //fieldInfo information
    -    fields = new ArrayList<Field>();
    -    String[] fieldInfoTuple = fieldInfo.split(",");
    -    for (int i = 0; i < fieldInfoTuple.length; i++) {
    -      String[] fieldTuple = fieldInfoTuple[i].split(":");
    -      Field field = new Field();
    -      field.setName(fieldTuple[0]);
    -      String[] typeFormat = fieldTuple[1].split("\\|");
    -      field.setType(typeFormat[0].toUpperCase());
    -      if (typeFormat.length > 1) {
    -        field.setFormat(typeFormat[1]);
    -      }
    -      getFields().add(field);
    -    }
    -    preference = new CsvPreference.Builder('"', fieldDelimiter, lineDelimiter).build();
    -    int countKeyValue = getFields().size();
    -    nameMapping = new String[countKeyValue];
    -    processors = new CellProcessor[countKeyValue];
    -    initialise(nameMapping, processors);
    -
    +    delimitedParserSchema = new DelimitedSchema(schema);
    +    preference = new CsvPreference.Builder(delimitedParserSchema.getQuoteChar(),
    +        delimitedParserSchema.getDelimiterChar(), delimitedParserSchema.getLineDelimiter()).build();
    +    nameMapping = delimitedParserSchema.getFieldNames()
    +        .toArray(new String[delimitedParserSchema.getFieldNames().size()]);
    +    processors = getProcessor(delimitedParserSchema.getFields());
       }
     
    -  private void initialise(String[] nameMapping, CellProcessor[] processors)
    +  /**
    +   * Returns array of cellprocessors, one for each field
    +   */
    +  private CellProcessor[] getProcessor(List<Field> fields)
       {
    -    for (int i = 0; i < getFields().size(); i++) {
    -      FIELD_TYPE type = getFields().get(i).type;
    -      nameMapping[i] = getFields().get(i).name;
    -      if (type == FIELD_TYPE.DATE) {
    -        String dateFormat = getFields().get(i).format;
    -        processors[i] = new Optional(new FmtDate(dateFormat == null ? "dd/MM/yyyy" :
dateFormat));
    +    CellProcessor[] processor = new CellProcessor[fields.size()];
    +    int fieldCount = 0;
    +    for (Field field : fields) {
    +      if (field.getType() == FieldType.DATE) {
    +        String format = field.getConstraints().get(DelimitedSchema.DATE_FORMAT) == null
? null
    +            : (String)field.getConstraints().get(DelimitedSchema.DATE_FORMAT);
    +        processor[fieldCount++] = new Optional(new FmtDate(format == null ? "dd/MM/yyyy"
: format));
           } else {
    -        processors[i] = new Optional();
    +        processor[fieldCount++] = new Optional();
           }
         }
    -
    +    return processor;
       }
     
       @Override
       public String convert(Object tuple)
       {
    +    incomingTuplesCount++;
    +    if (tuple == null) {
    +      errorTupleCount++;
    +      logger.error(" Null tuple", tuple);
    +      return null;
    +    }
         try {
           StringWriter stringWriter = new StringWriter();
           ICsvBeanWriter beanWriter = new CsvBeanWriter(stringWriter, preference);
           beanWriter.write(tuple, nameMapping, processors);
           beanWriter.flush();
           beanWriter.close();
    +      emittedObjectCount++;
           return stringWriter.toString();
         } catch (SuperCsvException e) {
    -      logger.debug("Error while converting tuple {} {}",tuple,e.getMessage());
    +      logger.error("Error while converting tuple {} {}", tuple, e.getMessage());
    +      errorTupleCount++;
         } catch (IOException e) {
           DTThrowable.rethrow(e);
         }
         return null;
       }
     
    -  public static class Field
    -  {
    -    String name;
    -    String format;
    -    FIELD_TYPE type;
    -
    -    public String getName()
    -    {
    -      return name;
    -    }
    -
    -    public void setName(String name)
    -    {
    -      this.name = name;
    -    }
    -
    -    public FIELD_TYPE getType()
    -    {
    -      return type;
    -    }
    -
    -    public void setType(String type)
    -    {
    -      this.type = FIELD_TYPE.valueOf(type);
    -    }
    -
    -    public String getFormat()
    -    {
    -      return format;
    -    }
    -
    -    public void setFormat(String format)
    -    {
    -      this.format = format;
    -    }
    -  }
    -
       /**
    -   * Gets the array list of the fields, a field being a POJO containing the name
    -   * of the field and type of field.
    +   * Get the schema
        * 
    -   * @return An array list of Fields.
    +   * @return schema
        */
    -  public ArrayList<Field> getFields()
    +  public String getSchema()
       {
    -    return fields;
    +    return schema;
       }
     
       /**
    -   * Sets the array list of the fields, a field being a POJO containing the name
    -   * of the field and type of field.
    +   * Set the schema
        * 
    -   * @param fields
    -   *          An array list of Fields.
    +   * @param schema
        */
    -  public void setFields(ArrayList<Field> fields)
    +  public void setSchema(String schema)
       {
    -    this.fields = fields;
    +    this.schema = schema;
       }
     
    -  /**
    -   * Gets the delimiter which separates fields in incoming data.
    -   * 
    -   * @return fieldDelimiter
    -   */
    -  public int getFieldDelimiter()
    +  @VisibleForTesting
    +  public long getErrorTupleCount()
       {
    -    return fieldDelimiter;
    +    return errorTupleCount;
       }
     
    -  /**
    -   * Sets the delimiter which separates fields in incoming data.
    -   * 
    -   * @param fieldDelimiter
    -   */
    -  public void setFieldDelimiter(int fieldDelimiter)
    +  @VisibleForTesting
    +  public long getEmittedObjectCount()
    --- End diff --
    
    You can make this protected? As the testcase will be in the same package, it'll be accesible
via protected as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message