apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chin...@apache.org
Subject apex-malhar git commit: APEXMALHAR-2105 enhancing CSV formatter to read field info from schema
Date Mon, 06 Jun 2016 23:36:19 GMT
Repository: apex-malhar
Updated Branches:
  refs/heads/master 18588f3a5 -> 3df6715d7


APEXMALHAR-2105 enhancing CSV formatter to read field info from schema


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/3df6715d
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/3df6715d
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/3df6715d

Branch: refs/heads/master
Commit: 3df6715d7aec3094b7794ae7becd2e12dbeda4b3
Parents: 18588f3
Author: shubham <shubham-pathak22@github.com>
Authored: Wed Jun 1 18:47:36 2016 +0530
Committer: Chinmay Kolhatkar <chinmay@datatorrent.com>
Committed: Mon Jun 6 16:36:07 2016 -0700

----------------------------------------------------------------------
 .../contrib/formatter/CsvFormatter.java         | 263 ++++++++-----------
 .../contrib/formatter/CsvFormatterTest.java     | 164 ++++++------
 2 files changed, 195 insertions(+), 232 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3df6715d/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java b/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java
index 4b4bae3..34ba49c 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java
@@ -20,7 +20,7 @@ package com.datatorrent.contrib.formatter;
 
 import java.io.IOException;
 import java.io.StringWriter;
-import java.util.ArrayList;
+import java.util.List;
 
 import javax.validation.constraints.NotNull;
 
@@ -34,7 +34,13 @@ import org.supercsv.io.CsvBeanWriter;
 import org.supercsv.io.ICsvBeanWriter;
 import org.supercsv.prefs.CsvPreference;
 
+import com.google.common.annotations.VisibleForTesting;
+
+import com.datatorrent.api.AutoMetric;
 import com.datatorrent.api.Context;
+import com.datatorrent.contrib.parser.DelimitedSchema;
+import com.datatorrent.contrib.parser.DelimitedSchema.Field;
+import com.datatorrent.contrib.parser.DelimitedSchema.FieldType;
 import com.datatorrent.lib.formatter.Formatter;
 import com.datatorrent.netlet.util.DTThrowable;
 
@@ -44,10 +50,14 @@ import com.datatorrent.netlet.util.DTThrowable;
  * java type.<br>
  * <br>
  * <b>Properties</b> <br>
- * <b>fieldInfo</b>:User need to specify fields and their types as a comma
- * separated string having format &lt;NAME&gt;:&lt;TYPE&gt;|&lt;FORMAT&gt;
in
- * the same order as incoming data. FORMAT refers to dates with dd/mm/yyyy as
- * default e.g name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy
+ * <b>schema</b>:schema as a string specified in a json format as per
+ * {@link DelimitedSchema}. Currently only Date constraint (fmt) is supported
+ * <br>
+ * <b>clazz</b>:Pojo class <br>
+ * <b>Ports</b> <br>
+ * <b>in</b>:input tuple as a POJO. Each tuple represents a record<br>
+ * <b>out</b>:tuples are are converted to string are emitted on this port<br>
+ * <b>err</b>:tuples that could not be converted are emitted on this port<br>
  * 
  * @displayName CsvFormatter
  * @category Formatter
@@ -58,215 +68,152 @@ import com.datatorrent.netlet.util.DTThrowable;
 public class CsvFormatter extends Formatter<String>
 {
 
-  private ArrayList<Field> fields;
-  @NotNull
-  protected String classname;
-  @NotNull
-  protected int fieldDelimiter;
-  protected String lineDelimiter;
+  /**
+   * Names of all the fields in the same order that would appear in output
+   * records
+   */
+  private transient String[] nameMapping;
+  /**
+   * Cell processors are an integral part of reading and writing with Super CSV
+   * they automate the data type conversions, and enforce constraints.
+   */
+  private transient CellProcessor[] processors;
+  /**
+   * Writing preferences that are passed through schema
+   */
+  private transient CsvPreference preference;
 
+  /**
+   * Contents of the schema.Schema is specified in a json format as per
+   * {@link DelimitedSchema}
+   */
   @NotNull
-  protected String fieldInfo;
+  private String schema;
+  /**
+   * Schema is read into this object to access fields
+   */
+  private transient DelimitedSchema delimitedParserSchema;
 
-  public enum FIELD_TYPE
-  {
-    BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
-  };
+  /**
+   * metric to keep count of number of tuples emitted on error port port
+   */
+  @AutoMetric
+  private long errorTupleCount;
 
-  protected transient String[] nameMapping;
-  protected transient CellProcessor[] processors;
-  protected transient CsvPreference preference;
+  /**
+   * metric to keep count of number of tuples emitted on out port
+   */
+  @AutoMetric
+  private long emittedObjectCount;
 
-  public CsvFormatter()
-  {
-    fields = new ArrayList<Field>();
-    fieldDelimiter = ',';
-    lineDelimiter = "\r\n";
+  /**
+   * metric to keep count of number of tuples emitted on input port
+   */
+  @AutoMetric
+  private long incomingTuplesCount;
 
+  @Override
+  public void beginWindow(long windowId)
+  {
+    super.beginWindow(windowId);
+    errorTupleCount = 0;
+    emittedObjectCount = 0;
+    incomingTuplesCount = 0;
   }
 
   @Override
   public void setup(Context.OperatorContext context)
   {
     super.setup(context);
-
-    //fieldInfo information
-    fields = new ArrayList<Field>();
-    String[] fieldInfoTuple = fieldInfo.split(",");
-    for (int i = 0; i < fieldInfoTuple.length; i++) {
-      String[] fieldTuple = fieldInfoTuple[i].split(":");
-      Field field = new Field();
-      field.setName(fieldTuple[0]);
-      String[] typeFormat = fieldTuple[1].split("\\|");
-      field.setType(typeFormat[0].toUpperCase());
-      if (typeFormat.length > 1) {
-        field.setFormat(typeFormat[1]);
-      }
-      getFields().add(field);
-    }
-    preference = new CsvPreference.Builder('"', fieldDelimiter, lineDelimiter).build();
-    int countKeyValue = getFields().size();
-    nameMapping = new String[countKeyValue];
-    processors = new CellProcessor[countKeyValue];
-    initialise(nameMapping, processors);
-
+    delimitedParserSchema = new DelimitedSchema(schema);
+    preference = new CsvPreference.Builder(delimitedParserSchema.getQuoteChar(),
+        delimitedParserSchema.getDelimiterChar(), delimitedParserSchema.getLineDelimiter()).build();
+    nameMapping = delimitedParserSchema.getFieldNames()
+        .toArray(new String[delimitedParserSchema.getFieldNames().size()]);
+    processors = getProcessor(delimitedParserSchema.getFields());
   }
 
-  private void initialise(String[] nameMapping, CellProcessor[] processors)
+  /**
+   * Returns array of cellprocessors, one for each field
+   */
+  private CellProcessor[] getProcessor(List<Field> fields)
   {
-    for (int i = 0; i < getFields().size(); i++) {
-      FIELD_TYPE type = getFields().get(i).type;
-      nameMapping[i] = getFields().get(i).name;
-      if (type == FIELD_TYPE.DATE) {
-        String dateFormat = getFields().get(i).format;
-        processors[i] = new Optional(new FmtDate(dateFormat == null ? "dd/MM/yyyy" : dateFormat));
+    CellProcessor[] processor = new CellProcessor[fields.size()];
+    int fieldCount = 0;
+    for (Field field : fields) {
+      if (field.getType() == FieldType.DATE) {
+        String format = field.getConstraints().get(DelimitedSchema.DATE_FORMAT) == null ?
null
+            : (String)field.getConstraints().get(DelimitedSchema.DATE_FORMAT);
+        processor[fieldCount++] = new Optional(new FmtDate(format == null ? "dd/MM/yyyy"
: format));
       } else {
-        processors[i] = new Optional();
+        processor[fieldCount++] = new Optional();
       }
     }
-
+    return processor;
   }
 
   @Override
   public String convert(Object tuple)
   {
+    incomingTuplesCount++;
+    if (tuple == null) {
+      errorTupleCount++;
+      logger.error(" Null tuple", tuple);
+      return null;
+    }
     try {
       StringWriter stringWriter = new StringWriter();
       ICsvBeanWriter beanWriter = new CsvBeanWriter(stringWriter, preference);
       beanWriter.write(tuple, nameMapping, processors);
       beanWriter.flush();
       beanWriter.close();
+      emittedObjectCount++;
       return stringWriter.toString();
     } catch (SuperCsvException e) {
-      logger.debug("Error while converting tuple {} {}",tuple,e.getMessage());
+      logger.error("Error while converting tuple {} {}", tuple, e.getMessage());
+      errorTupleCount++;
     } catch (IOException e) {
       DTThrowable.rethrow(e);
     }
     return null;
   }
 
-  public static class Field
-  {
-    String name;
-    String format;
-    FIELD_TYPE type;
-
-    public String getName()
-    {
-      return name;
-    }
-
-    public void setName(String name)
-    {
-      this.name = name;
-    }
-
-    public FIELD_TYPE getType()
-    {
-      return type;
-    }
-
-    public void setType(String type)
-    {
-      this.type = FIELD_TYPE.valueOf(type);
-    }
-
-    public String getFormat()
-    {
-      return format;
-    }
-
-    public void setFormat(String format)
-    {
-      this.format = format;
-    }
-  }
-
   /**
-   * Gets the array list of the fields, a field being a POJO containing the name
-   * of the field and type of field.
+   * Get the schema
    * 
-   * @return An array list of Fields.
+   * @return schema
    */
-  public ArrayList<Field> getFields()
+  public String getSchema()
   {
-    return fields;
+    return schema;
   }
 
   /**
-   * Sets the array list of the fields, a field being a POJO containing the name
-   * of the field and type of field.
+   * Set the schema
    * 
-   * @param fields
-   *          An array list of Fields.
+   * @param schema
    */
-  public void setFields(ArrayList<Field> fields)
+  public void setSchema(String schema)
   {
-    this.fields = fields;
+    this.schema = schema;
   }
 
-  /**
-   * Gets the delimiter which separates fields in incoming data.
-   * 
-   * @return fieldDelimiter
-   */
-  public int getFieldDelimiter()
+  @VisibleForTesting
+  protected long getErrorTupleCount()
   {
-    return fieldDelimiter;
+    return errorTupleCount;
   }
 
-  /**
-   * Sets the delimiter which separates fields in incoming data.
-   * 
-   * @param fieldDelimiter
-   */
-  public void setFieldDelimiter(int fieldDelimiter)
+  @VisibleForTesting
+  protected long getEmittedObjectCount()
   {
-    this.fieldDelimiter = fieldDelimiter;
+    return emittedObjectCount;
   }
 
-  /**
-   * Gets the delimiter which separates lines in incoming data.
-   * 
-   * @return lineDelimiter
-   */
-  public String getLineDelimiter()
-  {
-    return lineDelimiter;
-  }
-
-  /**
-   * Sets the delimiter which separates line in incoming data.
-   * 
-   * @param lineDelimiter
-   */
-  public void setLineDelimiter(String lineDelimiter)
-  {
-    this.lineDelimiter = lineDelimiter;
-  }
-
-  /**
-   * Gets the name of the fields with type and format in data as comma separated
-   * string in same order as incoming data. e.g
-   * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy
-   * 
-   * @return fieldInfo
-   */
-  public String getFieldInfo()
-  {
-    return fieldInfo;
-  }
-
-  /**
-   * Sets the name of the fields with type and format in data as comma separated
-   * string in same order as incoming data. e.g
-   * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy
-   * 
-   * @param fieldInfo
-   */
-  public void setFieldInfo(String fieldInfo)
+  @VisibleForTesting
+  protected long getIncomingTuplesCount()
   {
-    this.fieldInfo = fieldInfo;
+    return incomingTuplesCount;
   }
 
   private static final Logger logger = LoggerFactory.getLogger(CsvFormatter.class);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3df6715d/contrib/src/test/java/com/datatorrent/contrib/formatter/CsvFormatterTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/formatter/CsvFormatterTest.java
b/contrib/src/test/java/com/datatorrent/contrib/formatter/CsvFormatterTest.java
index 13d9739..4a1c770 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/formatter/CsvFormatterTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/formatter/CsvFormatterTest.java
@@ -18,7 +18,9 @@
  */
 package com.datatorrent.contrib.formatter;
 
-import java.util.Date;
+import java.io.IOException;
+
+import javax.validation.ConstraintViolationException;
 
 import org.joda.time.DateTime;
 import org.junit.Assert;
@@ -27,13 +29,25 @@ import org.junit.Test;
 import org.junit.rules.TestWatcher;
 import org.junit.runner.Description;
 
-import com.datatorrent.contrib.formatter.CsvFormatter;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.contrib.parser.CsvPOJOParserTest.Ad;
+import com.datatorrent.lib.appdata.schemas.SchemaUtils;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
 import com.datatorrent.lib.testbench.CollectorTestSink;
 import com.datatorrent.lib.util.TestUtils;
 
 public class CsvFormatterTest
 {
 
+  private static final String filename = "schema.json";
   CsvFormatter operator;
   CollectorTestSink<Object> validDataSink;
   CollectorTestSink<String> invalidDataSink;
@@ -49,8 +63,8 @@ public class CsvFormatterTest
     {
       super.starting(description);
       operator = new CsvFormatter();
-      operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date");
-      operator.setLineDelimiter("\r\n");
+      operator.setClazz(Ad.class);
+      operator.setSchema(SchemaUtils.jarResourceFileToString(filename));
       validDataSink = new CollectorTestSink<Object>();
       invalidDataSink = new CollectorTestSink<String>();
       TestUtils.setSink(operator.out, validDataSink);
@@ -70,95 +84,97 @@ public class CsvFormatterTest
   public void testPojoReaderToCsv()
   {
     operator.setup(null);
-    EmployeeBean emp = new EmployeeBean();
-    emp.setName("john");
-    emp.setDept("cs");
-    emp.setEid(1);
-    emp.setDateOfJoining(new DateTime().withDate(2015, 1, 1).toDate());
-    operator.in.process(emp);
+    Ad ad = new Ad();
+    ad.setCampaignId(9823);
+    ad.setAdId(1234);
+    ad.setAdName("ad");
+    ad.setBidPrice(1.2);
+    ad.setStartDate(
+        new DateTime().withDate(2015, 1, 1).withHourOfDay(0).withMinuteOfHour(0).withSecondOfMinute(0).toDate());
+    ad.setEndDate(new DateTime().withDate(2016, 1, 1).toDate());
+    ad.setSecurityCode(12345678);
+    ad.setParentCampaign("CAMP_AD");
+    ad.setActive(true);
+    ad.setWeatherTargeted('y');
+    ad.setValid("valid");
+    operator.in.process(ad);
     Assert.assertEquals(1, validDataSink.collectedTuples.size());
     Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
     String csvOp = (String)validDataSink.collectedTuples.get(0);
     Assert.assertNotNull(csvOp);
-    Assert.assertEquals("john,cs,1,01/01/2015" + operator.getLineDelimiter(), csvOp);
+    Assert.assertEquals("1234,9823,ad,1.2,2015-01-01 00:00:00,01/01/2016,12345678,true,false,CAMP_AD,y,valid\r\n",
+        csvOp);
+    Assert.assertEquals(1, operator.getIncomingTuplesCount());
+    Assert.assertEquals(0, operator.getErrorTupleCount());
+    Assert.assertEquals(1, operator.getEmittedObjectCount());
   }
 
   @Test
-  public void testPojoReaderToCsvMultipleDate()
+  public void testPojoReaderToCsvNullInput()
   {
-    operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date,dateOfBirth:date|dd-MMM-yyyy");
     operator.setup(null);
-    EmployeeBean emp = new EmployeeBean();
-    emp.setName("john");
-    emp.setDept("cs");
-    emp.setEid(1);
-    emp.setDateOfJoining(new DateTime().withDate(2015, 1, 1).toDate());
-    emp.setDateOfBirth(new DateTime().withDate(2015, 1, 1).toDate());
-    operator.in.process(emp);
-    Assert.assertEquals(1, validDataSink.collectedTuples.size());
-    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
-    String csvOp = (String)validDataSink.collectedTuples.get(0);
-    Assert.assertNotNull(csvOp);
-    Assert.assertEquals("john,cs,1,01/01/2015,01-Jan-2015" + operator.getLineDelimiter(),
csvOp);
+    operator.in.process(null);
+    Assert.assertEquals(0, validDataSink.collectedTuples.size());
+    Assert.assertEquals(1, invalidDataSink.collectedTuples.size());
+    Assert.assertEquals(1, operator.getIncomingTuplesCount());
+    Assert.assertEquals(1, operator.getErrorTupleCount());
+    Assert.assertEquals(0, operator.getEmittedObjectCount());
+
   }
 
-  public static class EmployeeBean
+  @Test
+  public void testApplication() throws IOException, Exception
   {
-
-    private String name;
-    private String dept;
-    private int eid;
-    private Date dateOfJoining;
-    private Date dateOfBirth;
-
-    public String getName()
-    {
-      return name;
-    }
-
-    public void setName(String name)
-    {
-      this.name = name;
-    }
-
-    public String getDept()
-    {
-      return dept;
-    }
-
-    public void setDept(String dept)
-    {
-      this.dept = dept;
-    }
-
-    public int getEid()
-    {
-      return eid;
-    }
-
-    public void setEid(int eid)
-    {
-      this.eid = eid;
-    }
-
-    public Date getDateOfJoining()
-    {
-      return dateOfJoining;
+    try {
+      LocalMode lma = LocalMode.newInstance();
+      Configuration conf = new Configuration(false);
+      lma.prepareDAG(new CsvParserApplication(), conf);
+      LocalMode.Controller lc = lma.getController();
+      lc.run(5000);// runs for 5 seconds and quits
+    } catch (ConstraintViolationException e) {
+      Assert.fail("constraint violations: " + e.getConstraintViolations());
     }
+  }
 
-    public void setDateOfJoining(Date dateOfJoining)
+  public static class CsvParserApplication implements StreamingApplication
+  {
+    @Override
+    public void populateDAG(DAG dag, Configuration conf)
     {
-      this.dateOfJoining = dateOfJoining;
+      PojoEmitter input = dag.addOperator("data", new PojoEmitter());
+      CsvFormatter formatter = dag.addOperator("formatter", new CsvFormatter());
+      dag.getMeta(formatter).getMeta(formatter.in).getAttributes().put(Context.PortContext.TUPLE_CLASS,
Ad.class);
+      formatter.setSchema(SchemaUtils.jarResourceFileToString("schema.json"));
+      ConsoleOutputOperator output = dag.addOperator("output", new ConsoleOutputOperator());
+      ConsoleOutputOperator error = dag.addOperator("error", new ConsoleOutputOperator());
+      output.setDebug(true);
+      dag.addStream("input", input.output, formatter.in);
+      dag.addStream("output", formatter.out, output.input);
+      dag.addStream("err", formatter.err, error.input);
     }
+  }
 
-    public Date getDateOfBirth()
-    {
-      return dateOfBirth;
-    }
+  public static class PojoEmitter extends BaseOperator implements InputOperator
+  {
+    public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<Object>();
 
-    public void setDateOfBirth(Date dateOfBirth)
+    @Override
+    public void emitTuples()
     {
-      this.dateOfBirth = dateOfBirth;
+      Ad ad = new Ad();
+      ad.setCampaignId(9823);
+      ad.setAdId(1234);
+      ad.setAdName("ad");
+      ad.setBidPrice(1.2);
+      ad.setStartDate(
+          new DateTime().withDate(2015, 1, 1).withHourOfDay(0).withMinuteOfHour(0).withSecondOfMinute(0).toDate());
+      ad.setEndDate(new DateTime().withDate(2016, 1, 1).toDate());
+      ad.setSecurityCode(12345678);
+      ad.setParentCampaign("CAMP_AD");
+      ad.setActive(true);
+      ad.setWeatherTargeted('y');
+      ad.setValid("valid");
+      output.emit(ad);
     }
   }
 


Mime
View raw message