apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tushar Gosavi <tus...@datatorrent.com>
Subject Re: incubator-apex-malhar git commit: MLHR-1838 Added pojo parsers and formatters(csv, json, xml)
Date Thu, 15 Oct 2015 10:07:24 GMT
My bad
I pulled changes on my laptop, and checked that compilation and new tests
cases passed before merge.
I did not pay attention to rat check, sorry for that. We have fixed rat
check failure through PR#62

Thanks,
- Tushar.


On Thu, Oct 15, 2015 at 9:15 AM, Thomas Weise <thomas@datatorrent.com>
wrote:

> This does not pass apache-rat:check
>
> https://api.travis-ci.org/jobs/85458770/log.txt?deansi=true
>
>
> *******************************
>
> Unapproved licenses:
>
>
> src/test/java/com/datatorrent/contrib/schema/formatter/XmlFormatterTest.java
>
> src/test/java/com/datatorrent/contrib/schema/formatter/JsonFormatterTest.java
>
> src/test/java/com/datatorrent/contrib/schema/formatter/CsvFormatterTest.java
>   src/test/java/com/datatorrent/contrib/schema/parser/CsvParserTest.java
>   src/test/java/com/datatorrent/contrib/schema/parser/JsonParserTest.java
>   src/test/java/com/datatorrent/contrib/schema/parser/XmlParserTest.java
>
>
> On Wed, Oct 14, 2015 at 4:48 AM, <tushar@apache.org> wrote:
>
>> Repository: incubator-apex-malhar
>> Updated Branches:
>>   refs/heads/devel-3 e1a45507b -> 3f4fe1866
>>
>>
>> MLHR-1838  Added pojo parsers and formatters(csv,json,xml)
>>
>>
>> Project:
>> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
>> Commit:
>> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/3f4fe186
>> Tree:
>> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/3f4fe186
>> Diff:
>> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/3f4fe186
>>
>> Branch: refs/heads/devel-3
>> Commit: 3f4fe18665c59dadb8ad289f696df983bdc451ce
>> Parents: e1a4550
>> Author: shubham <shubham-pathak22@github.com>
>> Authored: Fri Sep 11 16:26:03 2015 +0530
>> Committer: shubham <shubham-pathak22@github.com>
>> Committed: Wed Oct 14 10:53:12 2015 +0530
>>
>> ----------------------------------------------------------------------
>>  contrib/pom.xml                                 |  12 +
>>  .../contrib/converter/Converter.java            |  43 +++
>>  .../contrib/schema/formatter/CsvFormatter.java  | 285 +++++++++++++++++
>>  .../contrib/schema/formatter/Formatter.java     | 101 ++++++
>>  .../contrib/schema/formatter/JsonFormatter.java | 109 +++++++
>>  .../contrib/schema/formatter/XmlFormatter.java  | 172 ++++++++++
>>  .../contrib/schema/parser/CsvParser.java        | 314 +++++++++++++++++++
>>  .../contrib/schema/parser/JsonParser.java       | 106 +++++++
>>  .../contrib/schema/parser/Parser.java           | 102 ++++++
>>  .../contrib/schema/parser/XmlParser.java        | 141 +++++++++
>>  .../schema/formatter/CsvFormatterTest.java      | 147 +++++++++
>>  .../schema/formatter/JsonFormatterTest.java     | 186 +++++++++++
>>  .../schema/formatter/XmlFormatterTest.java      | 226 +++++++++++++
>>  .../contrib/schema/parser/CsvParserTest.java    | 172 ++++++++++
>>  .../contrib/schema/parser/JsonParserTest.java   | 212 +++++++++++++
>>  .../contrib/schema/parser/XmlParserTest.java    | 254 +++++++++++++++
>>  16 files changed, 2582 insertions(+)
>> ----------------------------------------------------------------------
>>
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/pom.xml
>> ----------------------------------------------------------------------
>> diff --git a/contrib/pom.xml b/contrib/pom.xml
>> index abed040..91ef5c7 100755
>> --- a/contrib/pom.xml
>> +++ b/contrib/pom.xml
>> @@ -606,5 +606,17 @@
>>        <version>${dt.framework.version}</version>
>>        <type>jar</type>
>>      </dependency>
>> +    <dependency>
>> +      <!-- required by Xml parser and formatter -->
>> +      <groupId>com.thoughtworks.xstream</groupId>
>> +      <artifactId>xstream</artifactId>
>> +      <version>1.4.8</version>
>> +    </dependency>
>> +    <dependency>
>> +      <!-- required by Csv parser and formatter -->
>> +      <groupId>net.sf.supercsv</groupId>
>> +      <artifactId>super-csv-joda</artifactId>
>> +      <version>2.3.1</version>
>> +    </dependency>
>>    </dependencies>
>>  </project>
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/main/java/com/datatorrent/contrib/converter/Converter.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/contrib/src/main/java/com/datatorrent/contrib/converter/Converter.java
>> b/contrib/src/main/java/com/datatorrent/contrib/converter/Converter.java
>> new file mode 100644
>> index 0000000..ebf2925
>> --- /dev/null
>> +++
>> b/contrib/src/main/java/com/datatorrent/contrib/converter/Converter.java
>> @@ -0,0 +1,43 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one
>> + * or more contributor license agreements.  See the NOTICE file
>> + * distributed with this work for additional information
>> + * regarding copyright ownership.  The ASF licenses this file
>> + * to you under the Apache License, Version 2.0 (the
>> + * "License"); you may not use this file except in compliance
>> + * with the License.  You may obtain a copy of the License at
>> + *
>> + *   http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing,
>> + * software distributed under the License is distributed on an
>> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>> + * KIND, either express or implied.  See the License for the
>> + * specific language governing permissions and limitations
>> + * under the License.
>> + */
>> +package com.datatorrent.contrib.converter;
>> +
>> +import org.apache.hadoop.classification.InterfaceStability;
>> +
>> +/**
>> + * Operators that are converting tuples from one format to another must
>> + * implement this interface. Eg. Parsers or formatters , that parse data
>> of
>> + * certain format and convert them to another format.
>> + *
>> + * @param <INPUT>
>> + * @param <OUTPUT>
>> + */
>> +@InterfaceStability.Evolving
>> +public interface Converter<INPUT, OUTPUT>
>> +{
>> +  /**
>> +   * Provide the implementation for converting tuples from one format to
>> the
>> +   * other
>> +   *
>> +   * @param INPUT
>> +   *          tuple of certain format
>> +   * @return OUTPUT tuple of converted format
>> +   */
>> +  public OUTPUT convert(INPUT tuple);
>> +}
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/CsvFormatter.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/CsvFormatter.java
>> b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/CsvFormatter.java
>> new file mode 100644
>> index 0000000..924acc6
>> --- /dev/null
>> +++
>> b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/CsvFormatter.java
>> @@ -0,0 +1,285 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one
>> + * or more contributor license agreements.  See the NOTICE file
>> + * distributed with this work for additional information
>> + * regarding copyright ownership.  The ASF licenses this file
>> + * to you under the Apache License, Version 2.0 (the
>> + * "License"); you may not use this file except in compliance
>> + * with the License.  You may obtain a copy of the License at
>> + *
>> + *   http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing,
>> + * software distributed under the License is distributed on an
>> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>> + * KIND, either express or implied.  See the License for the
>> + * specific language governing permissions and limitations
>> + * under the License.
>> + */
>> +package com.datatorrent.contrib.schema.formatter;
>> +
>> +import java.io.IOException;
>> +import java.io.StringWriter;
>> +import java.util.ArrayList;
>> +
>> +import javax.validation.constraints.NotNull;
>> +
>> +import org.slf4j.Logger;
>> +import org.slf4j.LoggerFactory;
>> +import org.supercsv.cellprocessor.FmtDate;
>> +import org.supercsv.cellprocessor.Optional;
>> +import org.supercsv.cellprocessor.ift.CellProcessor;
>> +import org.supercsv.exception.SuperCsvException;
>> +import org.supercsv.io.CsvBeanWriter;
>> +import org.supercsv.io.ICsvBeanWriter;
>> +import org.supercsv.prefs.CsvPreference;
>> +
>> +import org.apache.hadoop.classification.InterfaceStability;
>> +
>> +import com.datatorrent.api.Context;
>> +import com.datatorrent.netlet.util.DTThrowable;
>> +
>> +/**
>> + * Operator that converts POJO to CSV string <br>
>> + * Assumption is that each field in the delimited data should map to a
>> simple
>> + * java type.<br>
>> + * <br>
>> + * <b>Properties</b> <br>
>> + * <b>fieldInfo</b>:User need to specify fields and their types as a
>> comma
>> + * separated string having format
>> &lt;NAME&gt;:&lt;TYPE&gt;|&lt;FORMAT&gt; in
>> + * the same order as incoming data. FORMAT refers to dates with
>> dd/mm/yyyy as
>> + * default e.g
>> name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy
>> + *
>> + * @displayName CsvFormatter
>> + * @category Formatter
>> + * @tags pojo csv formatter
>> + */
>> +@InterfaceStability.Evolving
>> +public class CsvFormatter extends Formatter<String>
>> +{
>> +
>> +  private ArrayList<Field> fields;
>> +  @NotNull
>> +  protected String classname;
>> +  @NotNull
>> +  protected int fieldDelimiter;
>> +  protected String lineDelimiter;
>> +
>> +  @NotNull
>> +  protected String fieldInfo;
>> +
>> +  public enum FIELD_TYPE
>> +  {
>> +    BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
>> +  };
>> +
>> +  protected transient String[] nameMapping;
>> +  protected transient CellProcessor[] processors;
>> +  protected transient CsvPreference preference;
>> +
>> +  public CsvFormatter()
>> +  {
>> +    fields = new ArrayList<Field>();
>> +    fieldDelimiter = ',';
>> +    lineDelimiter = "\r\n";
>> +
>> +  }
>> +
>> +  @Override
>> +  public void setup(Context.OperatorContext context)
>> +  {
>> +    super.setup(context);
>> +
>> +    //fieldInfo information
>> +    fields = new ArrayList<Field>();
>> +    String[] fieldInfoTuple = fieldInfo.split(",");
>> +    for (int i = 0; i < fieldInfoTuple.length; i++) {
>> +      String[] fieldTuple = fieldInfoTuple[i].split(":");
>> +      Field field = new Field();
>> +      field.setName(fieldTuple[0]);
>> +      String[] typeFormat = fieldTuple[1].split("\\|");
>> +      field.setType(typeFormat[0].toUpperCase());
>> +      if (typeFormat.length > 1) {
>> +        field.setFormat(typeFormat[1]);
>> +      }
>> +      getFields().add(field);
>> +    }
>> +    preference = new CsvPreference.Builder('"', fieldDelimiter,
>> lineDelimiter).build();
>> +    int countKeyValue = getFields().size();
>> +    nameMapping = new String[countKeyValue];
>> +    processors = new CellProcessor[countKeyValue];
>> +    initialise(nameMapping, processors);
>> +
>> +  }
>> +
>> +  private void initialise(String[] nameMapping, CellProcessor[]
>> processors)
>> +  {
>> +    for (int i = 0; i < getFields().size(); i++) {
>> +      FIELD_TYPE type = getFields().get(i).type;
>> +      nameMapping[i] = getFields().get(i).name;
>> +      if (type == FIELD_TYPE.DATE) {
>> +        String dateFormat = getFields().get(i).format;
>> +        processors[i] = new Optional(new FmtDate(dateFormat == null ?
>> "dd/MM/yyyy" : dateFormat));
>> +      } else {
>> +        processors[i] = new Optional();
>> +      }
>> +    }
>> +
>> +  }
>> +
>> +  @Override
>> +  public void activate(Context context)
>> +  {
>> +
>> +  }
>> +
>> +  @Override
>> +  public void deactivate()
>> +  {
>> +
>> +  }
>> +
>> +  @Override
>> +  public String convert(Object tuple)
>> +  {
>> +    try {
>> +      StringWriter stringWriter = new StringWriter();
>> +      ICsvBeanWriter beanWriter = new CsvBeanWriter(stringWriter,
>> preference);
>> +      beanWriter.write(tuple, nameMapping, processors);
>> +      beanWriter.flush();
>> +      beanWriter.close();
>> +      return stringWriter.toString();
>> +    } catch (SuperCsvException e) {
>> +      logger.debug("Error while converting tuple {}
>> {}",tuple,e.getMessage());
>> +    } catch (IOException e) {
>> +      DTThrowable.rethrow(e);
>> +    }
>> +    return null;
>> +  }
>> +
>> +  public static class Field
>> +  {
>> +    String name;
>> +    String format;
>> +    FIELD_TYPE type;
>> +
>> +    public String getName()
>> +    {
>> +      return name;
>> +    }
>> +
>> +    public void setName(String name)
>> +    {
>> +      this.name = name;
>> +    }
>> +
>> +    public FIELD_TYPE getType()
>> +    {
>> +      return type;
>> +    }
>> +
>> +    public void setType(String type)
>> +    {
>> +      this.type = FIELD_TYPE.valueOf(type);
>> +    }
>> +
>> +    public String getFormat()
>> +    {
>> +      return format;
>> +    }
>> +
>> +    public void setFormat(String format)
>> +    {
>> +      this.format = format;
>> +    }
>> +  }
>> +
>> +  /**
>> +   * Gets the array list of the fields, a field being a POJO containing
>> the name
>> +   * of the field and type of field.
>> +   *
>> +   * @return An array list of Fields.
>> +   */
>> +  public ArrayList<Field> getFields()
>> +  {
>> +    return fields;
>> +  }
>> +
>> +  /**
>> +   * Sets the array list of the fields, a field being a POJO containing
>> the name
>> +   * of the field and type of field.
>> +   *
>> +   * @param fields
>> +   *          An array list of Fields.
>> +   */
>> +  public void setFields(ArrayList<Field> fields)
>> +  {
>> +    this.fields = fields;
>> +  }
>> +
>> +  /**
>> +   * Gets the delimiter which separates fields in incoming data.
>> +   *
>> +   * @return fieldDelimiter
>> +   */
>> +  public int getFieldDelimiter()
>> +  {
>> +    return fieldDelimiter;
>> +  }
>> +
>> +  /**
>> +   * Sets the delimiter which separates fields in incoming data.
>> +   *
>> +   * @param fieldDelimiter
>> +   */
>> +  public void setFieldDelimiter(int fieldDelimiter)
>> +  {
>> +    this.fieldDelimiter = fieldDelimiter;
>> +  }
>> +
>> +  /**
>> +   * Gets the delimiter which separates lines in incoming data.
>> +   *
>> +   * @return lineDelimiter
>> +   */
>> +  public String getLineDelimiter()
>> +  {
>> +    return lineDelimiter;
>> +  }
>> +
>> +  /**
>> +   * Sets the delimiter which separates line in incoming data.
>> +   *
>> +   * @param lineDelimiter
>> +   */
>> +  public void setLineDelimiter(String lineDelimiter)
>> +  {
>> +    this.lineDelimiter = lineDelimiter;
>> +  }
>> +
>> +  /**
>> +   * Gets the name of the fields with type and format in data as comma
>> separated
>> +   * string in same order as incoming data. e.g
>> +   * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy
>> +   *
>> +   * @return fieldInfo
>> +   */
>> +  public String getFieldInfo()
>> +  {
>> +    return fieldInfo;
>> +  }
>> +
>> +  /**
>> +   * Sets the name of the fields with type and format in data as comma
>> separated
>> +   * string in same order as incoming data. e.g
>> +   * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy
>> +   *
>> +   * @param fieldInfo
>> +   */
>> +  public void setFieldInfo(String fieldInfo)
>> +  {
>> +    this.fieldInfo = fieldInfo;
>> +  }
>> +
>> +  private static final Logger logger =
>> LoggerFactory.getLogger(CsvFormatter.class);
>> +}
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/Formatter.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/Formatter.java
>> b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/Formatter.java
>> new file mode 100644
>> index 0000000..19a78e0
>> --- /dev/null
>> +++
>> b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/Formatter.java
>> @@ -0,0 +1,101 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one
>> + * or more contributor license agreements.  See the NOTICE file
>> + * distributed with this work for additional information
>> + * regarding copyright ownership.  The ASF licenses this file
>> + * to you under the Apache License, Version 2.0 (the
>> + * "License"); you may not use this file except in compliance
>> + * with the License.  You may obtain a copy of the License at
>> + *
>> + *   http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing,
>> + * software distributed under the License is distributed on an
>> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>> + * KIND, either express or implied.  See the License for the
>> + * specific language governing permissions and limitations
>> + * under the License.
>> + */
>> +package com.datatorrent.contrib.schema.formatter;
>> +
>> +import org.apache.hadoop.classification.InterfaceStability;
>> +
>> +import com.datatorrent.api.Context;
>> +import com.datatorrent.api.Context.PortContext;
>> +import com.datatorrent.api.DefaultInputPort;
>> +import com.datatorrent.api.DefaultOutputPort;
>> +import com.datatorrent.api.Operator.ActivationListener;
>> +import com.datatorrent.api.annotation.InputPortFieldAnnotation;
>> +import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
>> +import com.datatorrent.common.util.BaseOperator;
>> +import com.datatorrent.contrib.converter.Converter;
>> +
>> +/**
>> + * Abstract class that implements Converter interface. This is a schema
>> enabled
>> + * Formatter <br>
>> + * Sub classes need to implement the convert method <br>
>> + * <b>Port Interface</b><br>
>> + * <b>in</b>: expects &lt;Object&gt; this is a schema enabled port<br>
>> + * <b>out</b>: emits &lt;OUTPUT&gt; <br>
>> + * <b>err</b>: emits &lt;Object&gt; error port that emits input tuple
>> that could
>> + * not be converted<br>
>> + * <br>
>> + *
>> + * @displayName Parser
>> + * @tags parser converter
>> + * @param <INPUT>
>> + */
>> +@InterfaceStability.Evolving
>> +public abstract class Formatter<OUTPUT> extends BaseOperator implements
>> Converter<Object, OUTPUT>,
>> +    ActivationListener<Context>
>> +{
>> +  protected transient Class<?> clazz;
>> +
>> +  @OutputPortFieldAnnotation
>> +  public transient DefaultOutputPort<OUTPUT> out = new
>> DefaultOutputPort<OUTPUT>();
>> +
>> +  @OutputPortFieldAnnotation(optional = true)
>> +  public transient DefaultOutputPort<Object> err = new
>> DefaultOutputPort<Object>();
>> +
>> +  @InputPortFieldAnnotation(schemaRequired = true)
>> +  public transient DefaultInputPort<Object> in = new
>> DefaultInputPort<Object>()
>> +  {
>> +    public void setup(PortContext context)
>> +    {
>> +      clazz = context.getValue(Context.PortContext.TUPLE_CLASS);
>> +    }
>> +
>> +    @Override
>> +    public void process(Object inputTuple)
>> +    {
>> +      OUTPUT tuple = convert(inputTuple);
>> +      if (tuple == null && err.isConnected()) {
>> +        err.emit(inputTuple);
>> +        return;
>> +      }
>> +      if (out.isConnected()) {
>> +        out.emit(tuple);
>> +      }
>> +    }
>> +  };
>> +
>> +  /**
>> +   * Get the class that needs to be formatted
>> +   *
>> +   * @return Class<?>
>> +   */
>> +  public Class<?> getClazz()
>> +  {
>> +    return clazz;
>> +  }
>> +
>> +  /**
>> +   * Set the class of tuple that needs to be formatted
>> +   *
>> +   * @param clazz
>> +   */
>> +  public void setClazz(Class<?> clazz)
>> +  {
>> +    this.clazz = clazz;
>> +  }
>> +}
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/JsonFormatter.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/JsonFormatter.java
>> b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/JsonFormatter.java
>> new file mode 100644
>> index 0000000..344ac60
>> --- /dev/null
>> +++
>> b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/JsonFormatter.java
>> @@ -0,0 +1,109 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one
>> + * or more contributor license agreements.  See the NOTICE file
>> + * distributed with this work for additional information
>> + * regarding copyright ownership.  The ASF licenses this file
>> + * to you under the Apache License, Version 2.0 (the
>> + * "License"); you may not use this file except in compliance
>> + * with the License.  You may obtain a copy of the License at
>> + *
>> + *   http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing,
>> + * software distributed under the License is distributed on an
>> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>> + * KIND, either express or implied.  See the License for the
>> + * specific language governing permissions and limitations
>> + * under the License.
>> + */
>> +package com.datatorrent.contrib.schema.formatter;
>> +
>> +import java.io.IOException;
>> +import java.text.SimpleDateFormat;
>> +
>> +import org.codehaus.jackson.JsonGenerationException;
>> +import org.codehaus.jackson.map.JsonMappingException;
>> +import org.codehaus.jackson.map.ObjectMapper;
>> +import org.codehaus.jackson.map.ObjectWriter;
>> +import org.codehaus.jackson.map.SerializationConfig;
>> +import org.slf4j.Logger;
>> +import org.slf4j.LoggerFactory;
>> +
>> +import org.apache.hadoop.classification.InterfaceStability;
>> +
>> +import com.datatorrent.api.Context;
>> +import com.datatorrent.netlet.util.DTThrowable;
>> +
>> +/**
>> + * Operator that converts POJO to JSON string <br>
>> + * <b>Properties</b> <br>
>> + * <b>dateFormat</b>: date format e.g dd/MM/yyyy
>> + *
>> + * @displayName JsonFormatter
>> + * @category Formatter
>> + * @tags pojo json formatter
>> + */
>> +@InterfaceStability.Evolving
>> +public class JsonFormatter extends Formatter<String>
>> +{
>> +  private transient ObjectWriter writer;
>> +  protected String dateFormat;
>> +
>> +  @Override
>> +  public void activate(Context context)
>> +  {
>> +    try {
>> +      ObjectMapper mapper = new ObjectMapper();
>> +      if (dateFormat != null) {
>> +        mapper.setDateFormat(new SimpleDateFormat(dateFormat));
>> +      }
>> +      writer = mapper.writerWithType(clazz);
>> +      mapper.configure(SerializationConfig.Feature.AUTO_DETECT_FIELDS,
>> true);
>> +      mapper.configure(SerializationConfig.Feature.AUTO_DETECT_GETTERS,
>> true);
>> +
>> mapper.configure(SerializationConfig.Feature.AUTO_DETECT_IS_GETTERS, true);
>> +    } catch (Throwable e) {
>> +      throw new RuntimeException("Unable find provided class");
>> +    }
>> +  }
>> +
>> +  @Override
>> +  public void deactivate()
>> +  {
>> +
>> +  }
>> +
>> +  @Override
>> +  public String convert(Object tuple)
>> +  {
>> +    try {
>> +      return writer.writeValueAsString(tuple);
>> +    } catch (JsonGenerationException | JsonMappingException e) {
>> +      logger.debug("Error while converting tuple {}
>> {}",tuple,e.getMessage());
>> +    } catch (IOException e) {
>> +      DTThrowable.rethrow(e);
>> +    }
>> +    return null;
>> +  }
>> +
>> +  /**
>> +   * Get the date format
>> +   *
>> +   * @return Date format string
>> +   */
>> +  public String getDateFormat()
>> +  {
>> +    return dateFormat;
>> +  }
>> +
>> +  /**
>> +   * Set the date format
>> +   *
>> +   * @param dateFormat
>> +   */
>> +  public void setDateFormat(String dateFormat)
>> +  {
>> +    this.dateFormat = dateFormat;
>> +  }
>> +
>> +  private static final Logger logger =
>> LoggerFactory.getLogger(JsonFormatter.class);
>> +}
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/XmlFormatter.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/XmlFormatter.java
>> b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/XmlFormatter.java
>> new file mode 100644
>> index 0000000..b387031
>> --- /dev/null
>> +++
>> b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/XmlFormatter.java
>> @@ -0,0 +1,172 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one
>> + * or more contributor license agreements.  See the NOTICE file
>> + * distributed with this work for additional information
>> + * regarding copyright ownership.  The ASF licenses this file
>> + * to you under the Apache License, Version 2.0 (the
>> + * "License"); you may not use this file except in compliance
>> + * with the License.  You may obtain a copy of the License at
>> + *
>> + *   http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing,
>> + * software distributed under the License is distributed on an
>> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>> + * KIND, either express or implied.  See the License for the
>> + * specific language governing permissions and limitations
>> + * under the License.
>> + */
>> +package com.datatorrent.contrib.schema.formatter;
>> +
>> +import java.io.Writer;
>> +
>> +import org.slf4j.Logger;
>> +import org.slf4j.LoggerFactory;
>> +
>> +import org.apache.hadoop.classification.InterfaceStability;
>> +
>> +import com.datatorrent.api.Context;
>> +
>> +import com.thoughtworks.xstream.XStream;
>> +import com.thoughtworks.xstream.XStreamException;
>> +import com.thoughtworks.xstream.converters.basic.DateConverter;
>> +import com.thoughtworks.xstream.io.HierarchicalStreamWriter;
>> +import com.thoughtworks.xstream.io.xml.CompactWriter;
>> +import com.thoughtworks.xstream.io.xml.XppDriver;
>> +
>> +/**
>> + * @displayName XmlParser
>> + * @category Formatter
>> + * @tags xml pojo formatter
>> + */
>> +@InterfaceStability.Evolving
>> +public class XmlFormatter extends Formatter<String>
>> +{
>> +
>> +  private transient XStream xstream;
>> +
>> +  protected String alias;
>> +  protected String dateFormat;
>> +  protected boolean prettyPrint;
>> +
>> +  public XmlFormatter()
>> +  {
>> +    alias = null;
>> +    dateFormat = null;
>> +  }
>> +
>> +  @Override
>> +  public void activate(Context context)
>> +  {
>> +    if (prettyPrint) {
>> +      xstream = new XStream();
>> +    } else {
>> +      xstream = new XStream(new XppDriver()
>> +      {
>> +        @Override
>> +        public HierarchicalStreamWriter createWriter(Writer out)
>> +        {
>> +          return new CompactWriter(out, getNameCoder());
>> +        }
>> +      });
>> +    }
>> +    if (alias != null) {
>> +      try {
>> +        xstream.alias(alias, clazz);
>> +      } catch (Throwable e) {
>> +        throw new RuntimeException("Unable find provided class");
>> +      }
>> +    }
>> +    if (dateFormat != null) {
>> +      xstream.registerConverter(new DateConverter(dateFormat, new
>> String[] {}));
>> +    }
>> +  }
>> +
>> +  @Override
>> +  public void deactivate()
>> +  {
>> +
>> +  }
>> +
>> +  @Override
>> +  public String convert(Object tuple)
>> +  {
>> +    try {
>> +      return xstream.toXML(tuple);
>> +    } catch (XStreamException e) {
>> +      logger.debug("Error while converting tuple {} {}
>> ",tuple,e.getMessage());
>> +      return null;
>> +    }
>> +  }
>> +
>> +  /**
>> +   * Gets the alias This is an optional step. Without it XStream would
>> work
>> +   * fine, but the XML element names would contain the fully qualified
>> name of
>> +   * each class (including package) which would bulk up the XML a bit.
>> +   *
>> +   * @return alias.
>> +   */
>> +  public String getAlias()
>> +  {
>> +    return alias;
>> +  }
>> +
>> +  /**
>> +   * Sets the alias This is an optional step. Without it XStream would
>> work
>> +   * fine, but the XML element names would contain the fully qualified
>> name of
>> +   * each class (including package) which would bulk up the XML a bit.
>> +   *
>> +   * @param alias
>> +   *          .
>> +   */
>> +  public void setAlias(String alias)
>> +  {
>> +    this.alias = alias;
>> +  }
>> +
>> +  /**
>> +   * Gets the date format e.g dd/mm/yyyy - this will be how a date would
>> be
>> +   * formatted
>> +   *
>> +   * @return dateFormat.
>> +   */
>> +  public String getDateFormat()
>> +  {
>> +    return dateFormat;
>> +  }
>> +
>> +  /**
>> +   * Sets the date format e.g dd/mm/yyyy - this will be how a date would
>> be
>> +   * formatted
>> +   *
>> +   * @param dateFormat
>> +   *          .
>> +   */
>> +  public void setDateFormat(String dateFormat)
>> +  {
>> +    this.dateFormat = dateFormat;
>> +  }
>> +
>> +  /**
>> +   * Returns true if pretty print is enabled.
>> +   *
>> +   * @return prettyPrint
>> +   */
>> +  public boolean isPrettyPrint()
>> +  {
>> +    return prettyPrint;
>> +  }
>> +
>> +  /**
>> +   * Sets pretty print option.
>> +   *
>> +   * @param prettyPrint
>> +   */
>> +  public void setPrettyPrint(boolean prettyPrint)
>> +  {
>> +    this.prettyPrint = prettyPrint;
>> +  }
>> +
>> +  private static final Logger logger =
>> LoggerFactory.getLogger(XmlFormatter.class);
>> +
>> +}
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/main/java/com/datatorrent/contrib/schema/parser/CsvParser.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/contrib/src/main/java/com/datatorrent/contrib/schema/parser/CsvParser.java
>> b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/CsvParser.java
>> new file mode 100644
>> index 0000000..4fd39fb
>> --- /dev/null
>> +++
>> b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/CsvParser.java
>> @@ -0,0 +1,314 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one
>> + * or more contributor license agreements.  See the NOTICE file
>> + * distributed with this work for additional information
>> + * regarding copyright ownership.  The ASF licenses this file
>> + * to you under the Apache License, Version 2.0 (the
>> + * "License"); you may not use this file except in compliance
>> + * with the License.  You may obtain a copy of the License at
>> + *
>> + *   http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing,
>> + * software distributed under the License is distributed on an
>> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>> + * KIND, either express or implied.  See the License for the
>> + * specific language governing permissions and limitations
>> + * under the License.
>> + */
>> +package com.datatorrent.contrib.schema.parser;
>> +
>> +import java.io.IOException;
>> +import java.util.ArrayList;
>> +
>> +import javax.validation.constraints.NotNull;
>> +
>> +import org.slf4j.Logger;
>> +import org.slf4j.LoggerFactory;
>> +import org.supercsv.cellprocessor.Optional;
>> +import org.supercsv.cellprocessor.ParseBool;
>> +import org.supercsv.cellprocessor.ParseChar;
>> +import org.supercsv.cellprocessor.ParseDate;
>> +import org.supercsv.cellprocessor.ParseDouble;
>> +import org.supercsv.cellprocessor.ParseInt;
>> +import org.supercsv.cellprocessor.ParseLong;
>> +import org.supercsv.cellprocessor.ift.CellProcessor;
>> +import org.supercsv.io.CsvBeanReader;
>> +import org.supercsv.prefs.CsvPreference;
>> +
>> +import org.apache.hadoop.classification.InterfaceStability;
>> +
>> +import com.datatorrent.api.Context;
>> +import com.datatorrent.api.Context.OperatorContext;
>> +import com.datatorrent.lib.util.ReusableStringReader;
>> +import com.datatorrent.netlet.util.DTThrowable;
>> +
>> +/**
>> + * Operator that converts CSV string to Pojo <br>
>> + * Assumption is that each field in the delimited data should map to a
>> simple
>> + * java type.<br>
>> + * <br>
>> + * <b>Properties</b> <br>
>> + * <b>fieldInfo</b>:User need to specify fields and their types as a
>> comma
>> + * separated string having format
>> &lt;NAME&gt;:&lt;TYPE&gt;|&lt;FORMAT&gt; in
>> + * the same order as incoming data. FORMAT refers to dates with
>> dd/mm/yyyy as
>> + * default e.g
>> name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy <br>
>> + * <b>fieldDelimiter</b>: Default is comma <br>
>> + * <b>lineDelimiter</b>: Default is '\r\n'
>> + *
>> + * @displayName CsvParser
>> + * @category Parsers
>> + * @tags csv pojo parser
>> + */
>> +@InterfaceStability.Evolving
>> +public class CsvParser extends Parser<String>
>> +{
>> +
>> +  private ArrayList<Field> fields;
>> +  @NotNull
>> +  protected int fieldDelimiter;
>> +  protected String lineDelimiter;
>> +
>> +  @NotNull
>> +  protected String fieldInfo;
>> +
>> +  protected transient String[] nameMapping;
>> +  protected transient CellProcessor[] processors;
>> +  private transient CsvBeanReader csvReader;
>> +
>> +  public enum FIELD_TYPE
>> +  {
>> +    BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
>> +  };
>> +
>> +  @NotNull
>> +  private transient ReusableStringReader csvStringReader = new
>> ReusableStringReader();
>> +
>> +  public CsvParser()
>> +  {
>> +    fields = new ArrayList<Field>();
>> +    fieldDelimiter = ',';
>> +    lineDelimiter = "\r\n";
>> +  }
>> +
>> +  @Override
>> +  public void setup(OperatorContext context)
>> +  {
>> +    super.setup(context);
>> +
>> +    logger.info("field info {}", fieldInfo);
>> +    fields = new ArrayList<Field>();
>> +    String[] fieldInfoTuple = fieldInfo.split(",");
>> +    for (int i = 0; i < fieldInfoTuple.length; i++) {
>> +      String[] fieldTuple = fieldInfoTuple[i].split(":");
>> +      Field field = new Field();
>> +      field.setName(fieldTuple[0]);
>> +      String[] typeFormat = fieldTuple[1].split("\\|");
>> +      field.setType(typeFormat[0].toUpperCase());
>> +      if (typeFormat.length > 1) {
>> +        field.setFormat(typeFormat[1]);
>> +      }
>> +      getFields().add(field);
>> +    }
>> +
>> +    CsvPreference preference = new CsvPreference.Builder('"',
>> fieldDelimiter, lineDelimiter).build();
>> +    csvReader = new CsvBeanReader(csvStringReader, preference);
>> +    int countKeyValue = getFields().size();
>> +    logger.info("countKeyValue {}", countKeyValue);
>> +    nameMapping = new String[countKeyValue];
>> +    processors = new CellProcessor[countKeyValue];
>> +    initialise(nameMapping, processors);
>> +  }
>> +
>> +  private void initialise(String[] nameMapping, CellProcessor[]
>> processors)
>> +  {
>> +    for (int i = 0; i < getFields().size(); i++) {
>> +      FIELD_TYPE type = getFields().get(i).type;
>> +      nameMapping[i] = getFields().get(i).name;
>> +      if (type == FIELD_TYPE.DOUBLE) {
>> +        processors[i] = new Optional(new ParseDouble());
>> +      } else if (type == FIELD_TYPE.INTEGER) {
>> +        processors[i] = new Optional(new ParseInt());
>> +      } else if (type == FIELD_TYPE.FLOAT) {
>> +        processors[i] = new Optional(new ParseDouble());
>> +      } else if (type == FIELD_TYPE.LONG) {
>> +        processors[i] = new Optional(new ParseLong());
>> +      } else if (type == FIELD_TYPE.SHORT) {
>> +        processors[i] = new Optional(new ParseInt());
>> +      } else if (type == FIELD_TYPE.STRING) {
>> +        processors[i] = new Optional();
>> +      } else if (type == FIELD_TYPE.CHARACTER) {
>> +        processors[i] = new Optional(new ParseChar());
>> +      } else if (type == FIELD_TYPE.BOOLEAN) {
>> +        processors[i] = new Optional(new ParseBool());
>> +      } else if (type == FIELD_TYPE.DATE) {
>> +        String dateFormat = getFields().get(i).format;
>> +        processors[i] = new Optional(new ParseDate(dateFormat == null ?
>> "dd/MM/yyyy" : dateFormat));
>> +      }
>> +    }
>> +  }
>> +
>> +  @Override
>> +  public void activate(Context context)
>> +  {
>> +
>> +  }
>> +
>> +  @Override
>> +  public void deactivate()
>> +  {
>> +
>> +  }
>> +
>> +  @Override
>> +  public Object convert(String tuple)
>> +  {
>> +    try {
>> +      csvStringReader.open(tuple);
>> +      return csvReader.read(clazz, nameMapping, processors);
>> +    } catch (IOException e) {
>> +      logger.debug("Error while converting tuple {}
>> {}",tuple,e.getMessage());
>> +      return null;
>> +    }
>> +  }
>> +
>> +  @Override
>> +  public void teardown()
>> +  {
>> +    try {
>> +      if (csvReader != null) {
>> +        csvReader.close();
>> +      }
>> +    } catch (IOException e) {
>> +      DTThrowable.rethrow(e);
>> +    }
>> +  }
>> +
>> +  public static class Field
>> +  {
>> +    String name;
>> +    String format;
>> +    FIELD_TYPE type;
>> +
>> +    public String getName()
>> +    {
>> +      return name;
>> +    }
>> +
>> +    public void setName(String name)
>> +    {
>> +      this.name = name;
>> +    }
>> +
>> +    public FIELD_TYPE getType()
>> +    {
>> +      return type;
>> +    }
>> +
>> +    public void setType(String type)
>> +    {
>> +      this.type = FIELD_TYPE.valueOf(type);
>> +    }
>> +
>> +    public String getFormat()
>> +    {
>> +      return format;
>> +    }
>> +
>> +    public void setFormat(String format)
>> +    {
>> +      this.format = format;
>> +    }
>> +
>> +  }
>> +
>> +  /**
>> +   * Gets the array list of the fields, a field being a POJO containing
>> the name
>> +   * of the field and type of field.
>> +   *
>> +   * @return An array list of Fields.
>> +   */
>> +  public ArrayList<Field> getFields()
>> +  {
>> +    return fields;
>> +  }
>> +
>> +  /**
>> +   * Sets the array list of the fields, a field being a POJO containing
>> the name
>> +   * of the field and type of field.
>> +   *
>> +   * @param fields
>> +   *          An array list of Fields.
>> +   */
>> +  public void setFields(ArrayList<Field> fields)
>> +  {
>> +    this.fields = fields;
>> +  }
>> +
>> +  /**
>> +   * Gets the delimiter which separates fields in incoming data.
>> +   *
>> +   * @return fieldDelimiter
>> +   */
>> +  public int getFieldDelimiter()
>> +  {
>> +    return fieldDelimiter;
>> +  }
>> +
>> +  /**
>> +   * Sets the delimiter which separates fields in incoming data.
>> +   *
>> +   * @param fieldDelimiter
>> +   */
>> +  public void setFieldDelimiter(int fieldDelimiter)
>> +  {
>> +    this.fieldDelimiter = fieldDelimiter;
>> +  }
>> +
>> +  /**
>> +   * Gets the delimiter which separates lines in incoming data.
>> +   *
>> +   * @return lineDelimiter
>> +   */
>> +  public String getLineDelimiter()
>> +  {
>> +    return lineDelimiter;
>> +  }
>> +
>> +  /**
>> +   * Sets the delimiter which separates line in incoming data.
>> +   *
>> +   * @param lineDelimiter
>> +   */
>> +  public void setLineDelimiter(String lineDelimiter)
>> +  {
>> +    this.lineDelimiter = lineDelimiter;
>> +  }
>> +
>> +  /**
>> +   * Gets the name of the fields with type and format ( for date ) as
>> comma
>> +   * separated string in same order as incoming data. e.g
>> +   * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy
>> +   *
>> +   * @return fieldInfo
>> +   */
>> +  public String getFieldInfo()
>> +  {
>> +    return fieldInfo;
>> +  }
>> +
>> +  /**
>> +   * Sets the name of the fields with type and format ( for date ) as
>> comma
>> +   * separated string in same order as incoming data. e.g
>> +   * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy
>> +   *
>> +   * @param fieldInfo
>> +   */
>> +  public void setFieldInfo(String fieldInfo)
>> +  {
>> +    this.fieldInfo = fieldInfo;
>> +  }
>> +
>> +  private static final Logger logger =
>> LoggerFactory.getLogger(CsvParser.class);
>> +
>> +}
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/main/java/com/datatorrent/contrib/schema/parser/JsonParser.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/contrib/src/main/java/com/datatorrent/contrib/schema/parser/JsonParser.java
>> b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/JsonParser.java
>> new file mode 100644
>> index 0000000..db45b33
>> --- /dev/null
>> +++
>> b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/JsonParser.java
>> @@ -0,0 +1,106 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one
>> + * or more contributor license agreements.  See the NOTICE file
>> + * distributed with this work for additional information
>> + * regarding copyright ownership.  The ASF licenses this file
>> + * to you under the Apache License, Version 2.0 (the
>> + * "License"); you may not use this file except in compliance
>> + * with the License.  You may obtain a copy of the License at
>> + *
>> + *   http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing,
>> + * software distributed under the License is distributed on an
>> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>> + * KIND, either express or implied.  See the License for the
>> + * specific language governing permissions and limitations
>> + * under the License.
>> + */
>> +package com.datatorrent.contrib.schema.parser;
>> +
>> +import java.io.IOException;
>> +import java.text.SimpleDateFormat;
>> +
>> +import org.codehaus.jackson.JsonProcessingException;
>> +import org.codehaus.jackson.map.DeserializationConfig;
>> +import org.codehaus.jackson.map.ObjectMapper;
>> +import org.codehaus.jackson.map.ObjectReader;
>> +import org.slf4j.Logger;
>> +import org.slf4j.LoggerFactory;
>> +
>> +import org.apache.hadoop.classification.InterfaceStability;
>> +
>> +import com.datatorrent.api.Context;
>> +import com.datatorrent.netlet.util.DTThrowable;
>> +
>> +/**
>> + * Operator that converts JSON string to Pojo <br>
>> + * <b>Properties</b> <br>
>> + * <b>dateFormat</b>: date format e.g dd/MM/yyyy
>> + *
>> + * @displayName JsonParser
>> + * @category Parsers
>> + * @tags json pojo parser
>> + */
>> +@InterfaceStability.Evolving
>> +public class JsonParser extends Parser<String>
>> +{
>> +
>> +  private transient ObjectReader reader;
>> +  protected String dateFormat;
>> +
>> +  @Override
>> +  public void activate(Context context)
>> +  {
>> +    try {
>> +      ObjectMapper mapper = new ObjectMapper();
>> +
>> mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES,
>> false);
>> +      if (dateFormat != null) {
>> +        mapper.setDateFormat(new SimpleDateFormat(dateFormat));
>> +      }
>> +      reader = mapper.reader(clazz);
>> +    } catch (Throwable e) {
>> +      throw new RuntimeException("Unable find provided class");
>> +    }
>> +  }
>> +
>> +  @Override
>> +  public void deactivate()
>> +  {
>> +  }
>> +
>> +  @Override
>> +  public Object convert(String tuple)
>> +  {
>> +    try {
>> +      return reader.readValue(tuple);
>> +    } catch (JsonProcessingException e) {
>> +      logger.debug("Error while converting tuple {}
>> {}",tuple,e.getMessage());
>> +    } catch (IOException e) {
>> +      DTThrowable.rethrow(e);
>> +    }
>> +    return null;
>> +  }
>> +
>> +  /**
>> +   * Get the date format
>> +   *
>> +   * @return Date format string
>> +   */
>> +  public String getDateFormat()
>> +  {
>> +    return dateFormat;
>> +  }
>> +
>> +  /**
>> +   * Set the date format
>> +   *
>> +   * @param dateFormat
>> +   */
>> +  public void setDateFormat(String dateFormat)
>> +  {
>> +    this.dateFormat = dateFormat;
>> +  }
>> +
>> +  private static final Logger logger =
>> LoggerFactory.getLogger(JsonParser.class);
>> +}
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/main/java/com/datatorrent/contrib/schema/parser/Parser.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/contrib/src/main/java/com/datatorrent/contrib/schema/parser/Parser.java
>> b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/Parser.java
>> new file mode 100644
>> index 0000000..e5ff7f5
>> --- /dev/null
>> +++
>> b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/Parser.java
>> @@ -0,0 +1,102 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one
>> + * or more contributor license agreements.  See the NOTICE file
>> + * distributed with this work for additional information
>> + * regarding copyright ownership.  The ASF licenses this file
>> + * to you under the Apache License, Version 2.0 (the
>> + * "License"); you may not use this file except in compliance
>> + * with the License.  You may obtain a copy of the License at
>> + *
>> + *   http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing,
>> + * software distributed under the License is distributed on an
>> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>> + * KIND, either express or implied.  See the License for the
>> + * specific language governing permissions and limitations
>> + * under the License.
>> + */
>> +package com.datatorrent.contrib.schema.parser;
>> +
>> +import org.apache.hadoop.classification.InterfaceStability;
>> +
>> +import com.datatorrent.api.Context;
>> +import com.datatorrent.api.Context.PortContext;
>> +import com.datatorrent.api.DefaultInputPort;
>> +import com.datatorrent.api.DefaultOutputPort;
>> +import com.datatorrent.api.Operator.ActivationListener;
>> +import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
>> +import com.datatorrent.common.util.BaseOperator;
>> +import com.datatorrent.contrib.converter.Converter;
>> +
>> +/**
>> + * Abstract class that implements Converter interface. This is a schema
>> enabled
>> + * Parser <br>
>> + * Sub classes need to implement the convert method <br>
>> + * <br>
>> + * <b>Port Interface</b><br>
>> + * <b>in</b>: expects &lt;INPUT&gt;<br>
>> + * <b>out</b>: emits &lt;Object&gt; this is a schema enabled port<br>
>> + * <b>err</b>: emits &lt;INPUT&gt; error port that emits input tuple
>> that could
>> + * not be converted<br>
>> + * <br>
>> + *
>> + * @displayName Parser
>> + * @tags parser converter
>> + * @param <INPUT>
>> + */
>> +@InterfaceStability.Evolving
>> +public abstract class Parser<INPUT> extends BaseOperator implements
>> Converter<INPUT, Object>,
>> +    ActivationListener<Context>
>> +{
>> +  protected transient Class<?> clazz;
>> +
>> +  @OutputPortFieldAnnotation(schemaRequired = true)
>> +  public transient DefaultOutputPort<Object> out = new
>> DefaultOutputPort<Object>()
>> +  {
>> +    public void setup(PortContext context)
>> +    {
>> +      clazz = context.getValue(Context.PortContext.TUPLE_CLASS);
>> +    }
>> +  };
>> +
>> +  @OutputPortFieldAnnotation(optional = true)
>> +  public transient DefaultOutputPort<INPUT> err = new
>> DefaultOutputPort<INPUT>();
>> +
>> +  public transient DefaultInputPort<INPUT> in = new
>> DefaultInputPort<INPUT>()
>> +  {
>> +    @Override
>> +    public void process(INPUT inputTuple)
>> +    {
>> +      Object tuple = convert(inputTuple);
>> +      if (tuple == null && err.isConnected()) {
>> +        err.emit(inputTuple);
>> +        return;
>> +      }
>> +      if (out.isConnected()) {
>> +        out.emit(tuple);
>> +      }
>> +    }
>> +  };
>> +
>> +  /**
>> +   * Get the class that needs to be formatted
>> +   *
>> +   * @return Class<?>
>> +   */
>> +  public Class<?> getClazz()
>> +  {
>> +    return clazz;
>> +  }
>> +
>> +  /**
>> +   * Set the class of tuple that needs to be formatted
>> +   *
>> +   * @param clazz
>> +   */
>> +  public void setClazz(Class<?> clazz)
>> +  {
>> +    this.clazz = clazz;
>> +  }
>> +
>> +}
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/main/java/com/datatorrent/contrib/schema/parser/XmlParser.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/contrib/src/main/java/com/datatorrent/contrib/schema/parser/XmlParser.java
>> b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/XmlParser.java
>> new file mode 100644
>> index 0000000..4931497
>> --- /dev/null
>> +++
>> b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/XmlParser.java
>> @@ -0,0 +1,141 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one
>> + * or more contributor license agreements.  See the NOTICE file
>> + * distributed with this work for additional information
>> + * regarding copyright ownership.  The ASF licenses this file
>> + * to you under the Apache License, Version 2.0 (the
>> + * "License"); you may not use this file except in compliance
>> + * with the License.  You may obtain a copy of the License at
>> + *
>> + *   http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing,
>> + * software distributed under the License is distributed on an
>> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>> + * KIND, either express or implied.  See the License for the
>> + * specific language governing permissions and limitations
>> + * under the License.
>> + */
>> +package com.datatorrent.contrib.schema.parser;
>> +
>> +import org.slf4j.Logger;
>> +import org.slf4j.LoggerFactory;
>> +
>> +import org.apache.hadoop.classification.InterfaceStability;
>> +
>> +import com.thoughtworks.xstream.XStream;
>> +import com.thoughtworks.xstream.XStreamException;
>> +import com.thoughtworks.xstream.converters.basic.DateConverter;
>> +
>> +import com.datatorrent.api.Context;
>> +
>> +/**
>> + * Operator that converts XML string to Pojo <br>
>> + * <b>Properties</b> <br>
>> + * <b>alias</b>:This maps to the root element of the XML string. If not
>> + * specified, parser would expect the root element to be fully qualified
>> name of
>> + * the Pojo Class. <br>
>> + * <b>dateFormats</b>: Comma separated string of date formats e.g
>> + * dd/mm/yyyy,dd-mmm-yyyy where first one would be considered default
>> + *
>> + * @displayName XmlParser
>> + * @category Parsers
>> + * @tags xml pojo parser
>> + */
>> +@InterfaceStability.Evolving
>> +public class XmlParser extends Parser<String>
>> +{
>> +
>> +  private transient XStream xstream;
>> +  protected String alias;
>> +  protected String dateFormats;
>> +
>> +  public XmlParser()
>> +  {
>> +    alias = null;
>> +    dateFormats = null;
>> +  }
>> +
>> +  @Override
>> +  public void activate(Context context)
>> +  {
>> +    xstream = new XStream();
>> +    if (alias != null) {
>> +      try {
>> +        xstream.alias(alias, clazz);
>> +      } catch (Throwable e) {
>> +        throw new RuntimeException("Unable find provided class");
>> +      }
>> +    }
>> +    if (dateFormats != null) {
>> +      String[] dateFormat = dateFormats.split(",");
>> +      xstream.registerConverter(new DateConverter(dateFormat[0],
>> dateFormat));
>> +    }
>> +  }
>> +
>> +  @Override
>> +  public void deactivate()
>> +  {
>> +
>> +  }
>> +
>> +  @Override
>> +  public Object convert(String tuple)
>> +  {
>> +    try {
>> +      return xstream.fromXML(tuple);
>> +    } catch (XStreamException e) {
>> +      logger.debug("Error while converting tuple {} {}",
>> tuple,e.getMessage());
>> +      return null;
>> +    }
>> +  }
>> +
>> +  /**
>> +   * Gets the alias
>> +   *
>> +   * @return alias.
>> +   */
>> +  public String getAlias()
>> +  {
>> +    return alias;
>> +  }
>> +
>> +  /**
>> +   * Sets the alias This maps to the root element of the XML string. If
>> not
>> +   * specified, parser would expect the root element to be fully
>> qualified name
>> +   * of the Pojo Class.
>> +   *
>> +   * @param alias
>> +   *          .
>> +   */
>> +  public void setAlias(String alias)
>> +  {
>> +    this.alias = alias;
>> +  }
>> +
>> +  /**
>> +   * Gets the comma separated string of date formats e.g
>> dd/mm/yyyy,dd-mmm-yyyy
>> +   * where first one would be considered default
>> +   *
>> +   * @return dateFormats.
>> +   */
>> +  public String getDateFormats()
>> +  {
>> +    return dateFormats;
>> +  }
>> +
>> +  /**
>> +   * Sets the comma separated string of date formats e.g
>> dd/mm/yyyy,dd-mmm-yyyy
>> +   * where first one would be considered default
>> +   *
>> +   * @param dateFormats
>> +   *          .
>> +   */
>> +  public void setDateFormats(String dateFormats)
>> +  {
>> +    this.dateFormats = dateFormats;
>> +  }
>> +
>> +  private static final Logger logger =
>> LoggerFactory.getLogger(XmlParser.class);
>> +
>> +}
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/CsvFormatterTest.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/CsvFormatterTest.java
>> b/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/CsvFormatterTest.java
>> new file mode 100644
>> index 0000000..8ecc088
>> --- /dev/null
>> +++
>> b/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/CsvFormatterTest.java
>> @@ -0,0 +1,147 @@
>> +package com.datatorrent.contrib.schema.formatter;
>> +
>> +import java.util.Date;
>> +
>> +import org.joda.time.DateTime;
>> +import org.junit.Assert;
>> +import org.junit.Rule;
>> +import org.junit.Test;
>> +import org.junit.rules.TestWatcher;
>> +import org.junit.runner.Description;
>> +
>> +import com.datatorrent.contrib.schema.formatter.CsvFormatter;
>> +import com.datatorrent.lib.testbench.CollectorTestSink;
>> +import com.datatorrent.lib.util.TestUtils;
>> +
>> +public class CsvFormatterTest
>> +{
>> +
>> +  CsvFormatter operator;
>> +  CollectorTestSink<Object> validDataSink;
>> +  CollectorTestSink<String> invalidDataSink;
>> +
>> +  @Rule
>> +  public Watcher watcher = new Watcher();
>> +
>> +  public class Watcher extends TestWatcher
>> +  {
>> +
>> +    @Override
>> +    protected void starting(Description description)
>> +    {
>> +      super.starting(description);
>> +      operator = new CsvFormatter();
>> +
>> operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date");
>> +      operator.setLineDelimiter("\r\n");
>> +      validDataSink = new CollectorTestSink<Object>();
>> +      invalidDataSink = new CollectorTestSink<String>();
>> +      TestUtils.setSink(operator.out, validDataSink);
>> +      TestUtils.setSink(operator.err, invalidDataSink);
>> +    }
>> +
>> +    @Override
>> +    protected void finished(Description description)
>> +    {
>> +      super.finished(description);
>> +      operator.teardown();
>> +    }
>> +
>> +  }
>> +
>> +  @Test
>> +  public void testPojoReaderToCsv()
>> +  {
>> +    operator.setup(null);
>> +    EmployeeBean emp = new EmployeeBean();
>> +    emp.setName("john");
>> +    emp.setDept("cs");
>> +    emp.setEid(1);
>> +    emp.setDateOfJoining(new DateTime().withDate(2015, 1, 1).toDate());
>> +    operator.in.process(emp);
>> +    Assert.assertEquals(1, validDataSink.collectedTuples.size());
>> +    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
>> +    String csvOp = (String)validDataSink.collectedTuples.get(0);
>> +    Assert.assertNotNull(csvOp);
>> +    Assert.assertEquals("john,cs,1,01/01/2015" +
>> operator.getLineDelimiter(), csvOp);
>> +  }
>> +
>> +  @Test
>> +  public void testPojoReaderToCsvMultipleDate()
>> +  {
>> +
>> operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date,dateOfBirth:date|dd-MMM-yyyy");
>> +    operator.setup(null);
>> +    EmployeeBean emp = new EmployeeBean();
>> +    emp.setName("john");
>> +    emp.setDept("cs");
>> +    emp.setEid(1);
>> +    emp.setDateOfJoining(new DateTime().withDate(2015, 1, 1).toDate());
>> +    emp.setDateOfBirth(new DateTime().withDate(2015, 1, 1).toDate());
>> +    operator.in.process(emp);
>> +    Assert.assertEquals(1, validDataSink.collectedTuples.size());
>> +    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
>> +    String csvOp = (String)validDataSink.collectedTuples.get(0);
>> +    Assert.assertNotNull(csvOp);
>> +    Assert.assertEquals("john,cs,1,01/01/2015,01-Jan-2015" +
>> operator.getLineDelimiter(), csvOp);
>> +  }
>> +
>> +  public static class EmployeeBean
>> +  {
>> +
>> +    private String name;
>> +    private String dept;
>> +    private int eid;
>> +    private Date dateOfJoining;
>> +    private Date dateOfBirth;
>> +
>> +    public String getName()
>> +    {
>> +      return name;
>> +    }
>> +
>> +    public void setName(String name)
>> +    {
>> +      this.name = name;
>> +    }
>> +
>> +    public String getDept()
>> +    {
>> +      return dept;
>> +    }
>> +
>> +    public void setDept(String dept)
>> +    {
>> +      this.dept = dept;
>> +    }
>> +
>> +    public int getEid()
>> +    {
>> +      return eid;
>> +    }
>> +
>> +    public void setEid(int eid)
>> +    {
>> +      this.eid = eid;
>> +    }
>> +
>> +    public Date getDateOfJoining()
>> +    {
>> +      return dateOfJoining;
>> +    }
>> +
>> +    public void setDateOfJoining(Date dateOfJoining)
>> +    {
>> +      this.dateOfJoining = dateOfJoining;
>> +    }
>> +
>> +    public Date getDateOfBirth()
>> +    {
>> +      return dateOfBirth;
>> +    }
>> +
>> +    public void setDateOfBirth(Date dateOfBirth)
>> +    {
>> +      this.dateOfBirth = dateOfBirth;
>> +    }
>> +  }
>> +
>> +}
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/JsonFormatterTest.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/JsonFormatterTest.java
>> b/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/JsonFormatterTest.java
>> new file mode 100644
>> index 0000000..4040c63
>> --- /dev/null
>> +++
>> b/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/JsonFormatterTest.java
>> @@ -0,0 +1,186 @@
>> +package com.datatorrent.contrib.schema.formatter;
>> +
>> +import java.io.ByteArrayOutputStream;
>> +import java.io.File;
>> +import java.io.IOException;
>> +import java.io.PrintStream;
>> +import java.util.Date;
>> +import java.util.List;
>> +
>> +import org.apache.commons.io.FileUtils;
>> +import org.joda.time.DateTime;
>> +import org.junit.Assert;
>> +import org.junit.Rule;
>> +import org.junit.Test;
>> +import org.junit.runner.Description;
>> +
>> +import
>> com.datatorrent.lib.io.fs.AbstractFileOutputOperatorTest.FSTestWatcher;
>> +import com.datatorrent.lib.testbench.CollectorTestSink;
>> +import com.datatorrent.lib.util.TestUtils;
>> +import com.datatorrent.lib.util.TestUtils.TestInfo;
>> +import com.google.common.collect.Lists;
>> +
>> +public class JsonFormatterTest
>> +{
>> +  JsonFormatter operator;
>> +  CollectorTestSink<Object> validDataSink;
>> +  CollectorTestSink<String> invalidDataSink;
>> +
>> +  final ByteArrayOutputStream myOut = new ByteArrayOutputStream();
>> +
>> +  public JsonFormatterTest()
>> +  {
>> +    // So that the output is cleaner.
>> +    System.setErr(new PrintStream(myOut));
>> +  }
>> +
>> +  @Rule
>> +  public TestInfo testMeta = new FSTestWatcher()
>> +  {
>> +    private void deleteDirectory()
>> +    {
>> +      try {
>> +        FileUtils.deleteDirectory(new File(getDir()));
>> +      } catch (IOException ex) {
>> +        throw new RuntimeException(ex);
>> +      }
>> +    }
>> +
>> +    @Override
>> +    protected void starting(Description descriptor)
>> +    {
>> +      super.starting(descriptor);
>> +      deleteDirectory();
>> +
>> +      operator = new JsonFormatter();
>> +
>> +      validDataSink = new CollectorTestSink<Object>();
>> +      invalidDataSink = new CollectorTestSink<String>();
>> +      TestUtils.setSink(operator.out, validDataSink);
>> +      TestUtils.setSink(operator.err, invalidDataSink);
>> +      operator.setup(null);
>> +      operator.activate(null);
>> +
>> +      operator.beginWindow(0);
>> +    }
>> +
>> +    @Override
>> +    protected void finished(Description description)
>> +    {
>> +      operator.endWindow();
>> +      operator.teardown();
>> +
>> +      deleteDirectory();
>> +      super.finished(description);
>> +    }
>> +  };
>> +
>> +  @Test
>> +  public void testJSONToPOJO()
>> +  {
>> +    Test1Pojo pojo = new Test1Pojo();
>> +    pojo.a = 123;
>> +    pojo.b = 234876274;
>> +    pojo.c = "HowAreYou?";
>> +    pojo.d = Lists.newArrayList("ABC", "PQR", "XYZ");
>> +
>> +    operator.in.put(pojo);
>> +    Assert.assertEquals(1, validDataSink.collectedTuples.size());
>> +    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
>> +    String expectedJSONString =
>> "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"],\"date\":null}";
>> +    Assert.assertEquals(expectedJSONString,
>> validDataSink.collectedTuples.get(0));
>> +  }
>> +
>> +  @Test
>> +  public void testJSONToPOJODate()
>> +  {
>> +    Test1Pojo pojo = new Test1Pojo();
>> +    pojo.a = 123;
>> +    pojo.b = 234876274;
>> +    pojo.c = "HowAreYou?";
>> +    pojo.d = Lists.newArrayList("ABC", "PQR", "XYZ");
>> +    pojo.date = new
>> DateTime().withYear(2015).withMonthOfYear(9).withDayOfMonth(15).toDate();
>> +    operator.setDateFormat("dd-MM-yyyy");
>> +    operator.setup(null);
>> +    operator.activate(null);
>> +    operator.in.put(pojo);
>> +    Assert.assertEquals(1, validDataSink.collectedTuples.size());
>> +    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
>> +    String expectedJSONString =
>> "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"],\"date\":\"15-09-2015\"}";
>> +    Assert.assertEquals(expectedJSONString,
>> validDataSink.collectedTuples.get(0));
>> +  }
>> +
>> +  @Test
>> +  public void testJSONToPOJONullFields()
>> +  {
>> +    Test1Pojo pojo = new Test1Pojo();
>> +    pojo.a = 123;
>> +    pojo.b = 234876274;
>> +    pojo.c = "HowAreYou?";
>> +    pojo.d = null;
>> +
>> +    operator.in.put(pojo);
>> +    Assert.assertEquals(1, validDataSink.collectedTuples.size());
>> +    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
>> +    String expectedJSONString =
>> "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":null,\"date\":null}";
>> +    Assert.assertEquals(expectedJSONString,
>> validDataSink.collectedTuples.get(0));
>> +  }
>> +
>> +  @Test
>> +  public void testJSONToPOJOEmptyPOJO()
>> +  {
>> +    Test1Pojo pojo = new Test1Pojo();
>> +    operator.in.put(pojo);
>> +    Assert.assertEquals(1, validDataSink.collectedTuples.size());
>> +    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
>> +    String expectedJSONString =
>> "{\"a\":0,\"b\":0,\"c\":null,\"d\":null,\"date\":null}";
>> +    System.out.println(validDataSink.collectedTuples.get(0));
>> +    Assert.assertEquals(expectedJSONString,
>> validDataSink.collectedTuples.get(0));
>> +  }
>> +
>> +  @Test
>> +  public void testJSONToPOJONullPOJO()
>> +  {
>> +    operator.in.put(null);
>> +    Assert.assertEquals(1, validDataSink.collectedTuples.size());
>> +    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
>> +    String expectedJSONString = "null";
>> +    Assert.assertEquals(expectedJSONString,
>> validDataSink.collectedTuples.get(0));
>> +  }
>> +
>> +  @Test
>> +  public void testJSONToPOJONoFieldPOJO()
>> +  {
>> +    operator.endWindow();
>> +    operator.teardown();
>> +    operator.setClazz(Test2Pojo.class);
>> +    operator.setup(null);
>> +    operator.beginWindow(1);
>> +
>> +    Test2Pojo o = new Test2Pojo();
>> +    operator.in.put(o);
>> +    Assert.assertEquals(0, validDataSink.collectedTuples.size());
>> +    Assert.assertEquals(1, invalidDataSink.collectedTuples.size());
>> +    Assert.assertEquals(o, invalidDataSink.collectedTuples.get(0));
>> +  }
>> +
>> +  public static class Test1Pojo
>> +  {
>> +    public int a;
>> +    public long b;
>> +    public String c;
>> +    public List<String> d;
>> +    public Date date;
>> +
>> +    @Override
>> +    public String toString()
>> +    {
>> +      return "Test1Pojo [a=" + a + ", b=" + b + ", c=" + c + ", d=" + d
>> + ", date=" + date + "]";
>> +    }
>> +  }
>> +
>> +  public static class Test2Pojo
>> +  {
>> +  }
>> +
>> +}
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/XmlFormatterTest.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/XmlFormatterTest.java
>> b/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/XmlFormatterTest.java
>> new file mode 100644
>> index 0000000..2bc1aec
>> --- /dev/null
>> +++
>> b/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/XmlFormatterTest.java
>> @@ -0,0 +1,226 @@
>> +package com.datatorrent.contrib.schema.formatter;
>> +
>> +import java.util.Date;
>> +
>> +import org.joda.time.DateTime;
>> +import org.junit.Assert;
>> +import org.junit.Rule;
>> +import org.junit.Test;
>> +import org.junit.rules.TestWatcher;
>> +import org.junit.runner.Description;
>> +
>> +import com.datatorrent.contrib.schema.formatter.XmlFormatter;
>> +import com.datatorrent.lib.testbench.CollectorTestSink;
>> +import com.datatorrent.lib.util.TestUtils;
>> +
>> +public class XmlFormatterTest
>> +{
>> +
>> +  XmlFormatter operator;
>> +  CollectorTestSink<Object> validDataSink;
>> +  CollectorTestSink<String> invalidDataSink;
>> +
>> +  @Rule
>> +  public Watcher watcher = new Watcher();
>> +
>> +  public class Watcher extends TestWatcher
>> +  {
>> +
>> +    @Override
>> +    protected void starting(Description description)
>> +    {
>> +      super.starting(description);
>> +      operator = new XmlFormatter();
>> +      operator.setClazz(EmployeeBean.class);
>> +      operator.setDateFormat("yyyy-MM-dd");
>> +      validDataSink = new CollectorTestSink<Object>();
>> +      invalidDataSink = new CollectorTestSink<String>();
>> +      TestUtils.setSink(operator.out, validDataSink);
>> +      TestUtils.setSink(operator.err, invalidDataSink);
>> +    }
>> +
>> +    @Override
>> +    protected void finished(Description description)
>> +    {
>> +      super.finished(description);
>> +      operator.teardown();
>> +    }
>> +
>> +  }
>> +
>> +  @Test
>> +  public void testPojoToXmlWithoutAlias()
>> +  {
>> +    EmployeeBean e = new EmployeeBean();
>> +    e.setName("john");
>> +    e.setEid(1);
>> +    e.setDept("cs");
>> +    e.setDateOfJoining(new
>> DateTime().withYear(2015).withMonthOfYear(1).withDayOfYear(1).toDate());
>> +
>> +    operator.setup(null);
>> +    operator.activate(null);
>> +    operator.in.process(e);
>> +    Assert.assertEquals(1, validDataSink.collectedTuples.size());
>> +    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
>> +    String expected =
>> "<com.datatorrent.contrib.schema.formatter.XmlFormatterTest_-EmployeeBean>"
>> + "<name>john</name>"
>> +        + "<dept>cs</dept>" + "<eid>1</eid>" +
>> "<dateOfJoining>2015-01-01</dateOfJoining>"
>> +        +
>> "</com.datatorrent.contrib.schema.formatter.XmlFormatterTest_-EmployeeBean>";
>> +    Assert.assertEquals(expected, validDataSink.collectedTuples.get(0));
>> +  }
>> +
>> +  @Test
>> +  public void testXmlToPojoWithAlias()
>> +  {
>> +    EmployeeBean e = new EmployeeBean();
>> +    e.setName("john");
>> +    e.setEid(1);
>> +    e.setDept("cs");
>> +    e.setDateOfJoining(new
>> DateTime().withYear(2015).withMonthOfYear(1).withDayOfYear(1).toDate());
>> +
>> +    operator.setAlias("EmployeeBean");
>> +    operator.setup(null);
>> +    operator.activate(null);
>> +    operator.in.process(e);
>> +    Assert.assertEquals(1, validDataSink.collectedTuples.size());
>> +    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
>> +    String expected = "<EmployeeBean>" + "<name>john</name>" +
>> "<dept>cs</dept>" + "<eid>1</eid>"
>> +        + "<dateOfJoining>2015-01-01</dateOfJoining>" +
>> "</EmployeeBean>";
>> +    Assert.assertEquals(expected, validDataSink.collectedTuples.get(0));
>> +  }
>> +
>> +  @Test
>> +  public void testXmlToPojoWithPrettyPrint()
>> +  {
>> +    EmployeeBean e = new EmployeeBean();
>> +    e.setName("john");
>> +    e.setEid(1);
>> +    e.setDept("cs");
>> +    e.setDateOfJoining(new
>> DateTime().withYear(2015).withMonthOfYear(1).withDayOfYear(1).toDate());
>> +
>> +    operator.setAlias("EmployeeBean");
>> +    operator.setPrettyPrint(true);
>> +    operator.setup(null);
>> +    operator.activate(null);
>> +    operator.in.process(e);
>> +    Assert.assertEquals(1, validDataSink.collectedTuples.size());
>> +    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
>> +    String expected = "<EmployeeBean>\n" + "  <name>john</name>\n" + "
>> <dept>cs</dept>\n" + "  <eid>1</eid>\n"
>> +        + "  <dateOfJoining>2015-01-01</dateOfJoining>\n" +
>> "</EmployeeBean>";
>> +    Assert.assertEquals(expected, validDataSink.collectedTuples.get(0));
>> +  }
>> +
>> +  @Test
>> +  public void testPojoToXmlWithoutAliasHeirarchical()
>> +  {
>> +    EmployeeBean e = new EmployeeBean();
>> +    e.setName("john");
>> +    e.setEid(1);
>> +    e.setDept("cs");
>> +    e.setDateOfJoining(new
>> DateTime().withYear(2015).withMonthOfYear(1).withDayOfYear(1).toDate());
>> +    Address address = new Address();
>> +    address.setCity("new york");
>> +    address.setCountry("US");
>> +    e.setAddress(address);
>> +
>> +    operator.setup(null);
>> +    operator.activate(null);
>> +    operator.in.process(e);
>> +    System.out.println(validDataSink.collectedTuples.get(0));
>> +    Assert.assertEquals(1, validDataSink.collectedTuples.size());
>> +    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
>> +    String expected =
>> "<com.datatorrent.contrib.schema.formatter.XmlFormatterTest_-EmployeeBean>"
>> + "<name>john</name>"
>> +        + "<dept>cs</dept>" + "<eid>1</eid>" +
>> "<dateOfJoining>2015-01-01</dateOfJoining>" + "<address>"
>> +        + "<city>new york</city>" + "<country>US</country>" +
>> "</address>"
>> +        +
>> "</com.datatorrent.contrib.schema.formatter.XmlFormatterTest_-EmployeeBean>";
>> +    Assert.assertEquals(expected, validDataSink.collectedTuples.get(0));
>> +  }
>> +
>> +  public static class EmployeeBean
>> +  {
>> +
>> +    private String name;
>> +    private String dept;
>> +    private int eid;
>> +    private Date dateOfJoining;
>> +    private Address address;
>> +
>> +    public String getName()
>> +    {
>> +      return name;
>> +    }
>> +
>> +    public void setName(String name)
>> +    {
>> +      this.name = name;
>> +    }
>> +
>> +    public String getDept()
>> +    {
>> +      return dept;
>> +    }
>> +
>> +    public void setDept(String dept)
>> +    {
>> +      this.dept = dept;
>> +    }
>> +
>> +    public int getEid()
>> +    {
>> +      return eid;
>> +    }
>> +
>> +    public void setEid(int eid)
>> +    {
>> +      this.eid = eid;
>> +    }
>> +
>> +    public Date getDateOfJoining()
>> +    {
>> +      return dateOfJoining;
>> +    }
>> +
>> +    public void setDateOfJoining(Date dateOfJoining)
>> +    {
>> +      this.dateOfJoining = dateOfJoining;
>> +    }
>> +
>> +    public Address getAddress()
>> +    {
>> +      return address;
>> +    }
>> +
>> +    public void setAddress(Address address)
>> +    {
>> +      this.address = address;
>> +    }
>> +  }
>> +
>> +  public static class Address
>> +  {
>> +
>> +    private String city;
>> +    private String country;
>> +
>> +    public String getCity()
>> +    {
>> +      return city;
>> +    }
>> +
>> +    public void setCity(String city)
>> +    {
>> +      this.city = city;
>> +    }
>> +
>> +    public String getCountry()
>> +    {
>> +      return country;
>> +    }
>> +
>> +    public void setCountry(String country)
>> +    {
>> +      this.country = country;
>> +    }
>> +
>> +  }
>> +
>> +}
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/test/java/com/datatorrent/contrib/schema/parser/CsvParserTest.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/contrib/src/test/java/com/datatorrent/contrib/schema/parser/CsvParserTest.java
>> b/contrib/src/test/java/com/datatorrent/contrib/schema/parser/CsvParserTest.java
>> new file mode 100644
>> index 0000000..3c31ad0
>> --- /dev/null
>> +++
>> b/contrib/src/test/java/com/datatorrent/contrib/schema/parser/CsvParserTest.java
>> @@ -0,0 +1,172 @@
>> +package com.datatorrent.contrib.schema.parser;
>> +
>> +import java.util.Date;
>> +
>> +import org.joda.time.DateTime;
>> +import org.junit.Assert;
>> +import org.junit.Rule;
>> +import org.junit.Test;
>> +import org.junit.rules.TestWatcher;
>> +import org.junit.runner.Description;
>> +
>> +import com.datatorrent.contrib.schema.parser.CsvParser;
>> +import com.datatorrent.lib.testbench.CollectorTestSink;
>> +import com.datatorrent.lib.util.TestUtils;
>> +
>> +public class CsvParserTest
>> +{
>> +
>> +  CsvParser operator;
>> +  CollectorTestSink<Object> validDataSink;
>> +  CollectorTestSink<String> invalidDataSink;
>> +
>> +  @Rule
>> +  public Watcher watcher = new Watcher();
>> +
>> +  public class Watcher extends TestWatcher
>> +  {
>> +
>> +    @Override
>> +    protected void starting(Description description)
>> +    {
>> +      super.starting(description);
>> +      operator = new CsvParser();
>> +      operator.setClazz(EmployeeBean.class);
>> +
>> operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date");
>> +      validDataSink = new CollectorTestSink<Object>();
>> +      invalidDataSink = new CollectorTestSink<String>();
>> +      TestUtils.setSink(operator.out, validDataSink);
>> +      TestUtils.setSink(operator.err, invalidDataSink);
>> +    }
>> +
>> +    @Override
>> +    protected void finished(Description description)
>> +    {
>> +      super.finished(description);
>> +      operator.teardown();
>> +    }
>> +
>> +  }
>> +
>> +  @Test
>> +  public void testCsvToPojoWriterDefault()
>> +  {
>> +    operator.setup(null);
>> +    String tuple = "john,cs,1,01/01/2015";
>> +    operator.in.process(tuple);
>> +    Assert.assertEquals(1, validDataSink.collectedTuples.size());
>> +    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
>> +    Object obj = validDataSink.collectedTuples.get(0);
>> +    Assert.assertNotNull(obj);
>> +    Assert.assertEquals(EmployeeBean.class, obj.getClass());
>> +    EmployeeBean pojo = (EmployeeBean)obj;
>> +    Assert.assertEquals("john", pojo.getName());
>> +    Assert.assertEquals("cs", pojo.getDept());
>> +    Assert.assertEquals(1, pojo.getEid());
>> +    Assert.assertEquals(new DateTime().withDate(2015, 1,
>> 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime(
>> +        pojo.getDateOfJoining()));
>> +  }
>> +
>> +  @Test
>> +  public void testCsvToPojoWriterDateFormat()
>> +  {
>> +
>> operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date|dd-MMM-yyyy");
>> +    operator.setup(null);
>> +    String tuple = "john,cs,1,01-JAN-2015";
>> +    operator.in.process(tuple);
>> +    Assert.assertEquals(1, validDataSink.collectedTuples.size());
>> +    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
>> +    Object obj = validDataSink.collectedTuples.get(0);
>> +    Assert.assertNotNull(obj);
>> +    Assert.assertEquals(EmployeeBean.class, obj.getClass());
>> +    EmployeeBean pojo = (EmployeeBean)obj;
>> +    Assert.assertEquals("john", pojo.getName());
>> +    Assert.assertEquals("cs", pojo.getDept());
>> +    Assert.assertEquals(1, pojo.getEid());
>> +    Assert.assertEquals(new DateTime().withDate(2015, 1,
>> 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime(
>> +        pojo.getDateOfJoining()));
>> +  }
>> +
>> +  @Test
>> +  public void testCsvToPojoWriterDateFormatMultiple()
>> +  {
>> +
>> operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date|dd-MMM-yyyy,dateOfBirth:date");
>> +    operator.setup(null);
>> +    String tuple = "john,cs,1,01-JAN-2015,01/01/2015";
>> +    operator.in.process(tuple);
>> +    Assert.assertEquals(1, validDataSink.collectedTuples.size());
>> +    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
>> +    Object obj = validDataSink.collectedTuples.get(0);
>> +    Assert.assertNotNull(obj);
>> +    Assert.assertEquals(EmployeeBean.class, obj.getClass());
>> +    EmployeeBean pojo = (EmployeeBean)obj;
>> +    Assert.assertEquals("john", pojo.getName());
>> +    Assert.assertEquals("cs", pojo.getDept());
>> +    Assert.assertEquals(1, pojo.getEid());
>> +    Assert.assertEquals(new DateTime().withDate(2015, 1,
>> 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime(
>> +        pojo.getDateOfJoining()));
>> +    Assert.assertEquals(new DateTime().withDate(2015, 1,
>> 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime(
>> +        pojo.getDateOfBirth()));
>> +  }
>> +
>> +  public static class EmployeeBean
>> +  {
>> +
>> +    private String name;
>> +    private String dept;
>> +    private int eid;
>> +    private Date dateOfJoining;
>> +    private Date dateOfBirth;
>> +
>> +    public String getName()
>> +    {
>> +      return name;
>> +    }
>> +
>> +    public void setName(String name)
>> +    {
>> +      this.name = name;
>> +    }
>> +
>> +    public String getDept()
>> +    {
>> +      return dept;
>> +    }
>> +
>> +    public void setDept(String dept)
>> +    {
>> +      this.dept = dept;
>> +    }
>> +
>> +    public int getEid()
>> +    {
>> +      return eid;
>> +    }
>> +
>> +    public void setEid(int eid)
>> +    {
>> +      this.eid = eid;
>> +    }
>> +
>> +    public Date getDateOfJoining()
>> +    {
>> +      return dateOfJoining;
>> +    }
>> +
>> +    public void setDateOfJoining(Date dateOfJoining)
>> +    {
>> +      this.dateOfJoining = dateOfJoining;
>> +    }
>> +
>> +    public Date getDateOfBirth()
>> +    {
>> +      return dateOfBirth;
>> +    }
>> +
>> +    public void setDateOfBirth(Date dateOfBirth)
>> +    {
>> +      this.dateOfBirth = dateOfBirth;
>> +    }
>> +  }
>> +
>> +}
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/test/java/com/datatorrent/contrib/schema/parser/JsonParserTest.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/contrib/src/test/java/com/datatorrent/contrib/schema/parser/JsonParserTest.java
>> b/contrib/src/test/java/com/datatorrent/contrib/schema/parser/JsonParserTest.java
>> new file mode 100644
>> index 0000000..b453508
>> --- /dev/null
>> +++
>> b/contrib/src/test/java/com/datatorrent/contrib/schema/parser/JsonParserTest.java
>> @@ -0,0 +1,212 @@
>> +package com.datatorrent.contrib.schema.parser;
>> +
>> +import java.io.ByteArrayOutputStream;
>> +import java.io.File;
>> +import java.io.IOException;
>> +import java.io.PrintStream;
>> +import java.util.Date;
>> +import java.util.List;
>> +
>> +import org.apache.commons.io.FileUtils;
>> +import org.joda.time.DateTime;
>> +import org.junit.Assert;
>> +import org.junit.Rule;
>> +import org.junit.Test;
>> +import org.junit.runner.Description;
>> +
>> +import
>> com.datatorrent.lib.io.fs.AbstractFileOutputOperatorTest.FSTestWatcher;
>> +import com.datatorrent.lib.testbench.CollectorTestSink;
>> +import com.datatorrent.lib.util.TestUtils;
>> +import com.datatorrent.lib.util.TestUtils.TestInfo;
>> +
>> +public class JsonParserTest
>> +{
>> +  JsonParser operator;
>> +  CollectorTestSink<Object> validDataSink;
>> +  CollectorTestSink<String> invalidDataSink;
>> +
>> +  final ByteArrayOutputStream myOut = new ByteArrayOutputStream();
>> +
>> +  public JsonParserTest()
>> +  {
>> +    // So that the output is cleaner.
>> +    System.setErr(new PrintStream(myOut));
>> +  }
>> +
>> +  @Rule
>> +  public TestInfo testMeta = new FSTestWatcher()
>> +  {
>> +    private void deleteDirectory()
>> +    {
>> +      try {
>> +        FileUtils.deleteDirectory(new File(getDir()));
>> +      } catch (IOException ex) {
>> +        throw new RuntimeException(ex);
>> +      }
>> +    }
>> +
>> +    @Override
>> +    protected void starting(Description descriptor)
>> +    {
>> +
>> +      super.starting(descriptor);
>> +      deleteDirectory();
>> +
>> +      operator = new JsonParser();
>> +      operator.setClazz(Test1Pojo.class);
>> +      validDataSink = new CollectorTestSink<Object>();
>> +      invalidDataSink = new CollectorTestSink<String>();
>> +      TestUtils.setSink(operator.out, validDataSink);
>> +      TestUtils.setSink(operator.err, invalidDataSink);
>> +      operator.setup(null);
>> +      operator.activate(null);
>> +
>> +      operator.beginWindow(0);
>> +    }
>> +
>> +    @Override
>> +    protected void finished(Description description)
>> +    {
>> +      operator.endWindow();
>> +      operator.teardown();
>> +
>> +      deleteDirectory();
>> +      super.finished(description);
>> +    }
>> +  };
>> +
>> +  @Test
>> +  public void testJSONToPOJO()
>> +  {
>> +    String tuple =
>> "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"]}";
>> +    operator.in.put(tuple);
>> +    Assert.assertEquals(1, validDataSink.collectedTuples.size());
>> +    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
>> +    Object obj = validDataSink.collectedTuples.get(0);
>> +    Assert.assertNotNull(obj);
>> +    Assert.assertEquals(Test1Pojo.class, obj.getClass());
>> +    Test1Pojo pojo = (Test1Pojo)obj;
>> +    Assert.assertEquals(123, pojo.a);
>> +    Assert.assertEquals(234876274, pojo.b);
>> +    Assert.assertEquals("HowAreYou?", pojo.c);
>> +    Assert.assertEquals(3, pojo.d.size());
>> +    Assert.assertEquals("ABC", pojo.d.get(0));
>> +    Assert.assertEquals("PQR", pojo.d.get(1));
>> +    Assert.assertEquals("XYZ", pojo.d.get(2));
>> +  }
>> +
>> +  @Test
>> +  public void testJSONToPOJODate()
>> +  {
>> +    String tuple =
>> "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"],\"date\":\"15-09-2015\"}";
>> +    operator.setDateFormat("dd-MM-yyyy");
>> +    operator.setup(null);
>> +    operator.activate(null);
>> +    operator.in.put(tuple);
>> +    Assert.assertEquals(1, validDataSink.collectedTuples.size());
>> +    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
>> +    Object obj = validDataSink.collectedTuples.get(0);
>> +    Assert.assertNotNull(obj);
>> +    Assert.assertEquals(Test1Pojo.class, obj.getClass());
>> +    Test1Pojo pojo = (Test1Pojo)obj;
>> +    Assert.assertEquals(123, pojo.a);
>> +    Assert.assertEquals(234876274, pojo.b);
>> +    Assert.assertEquals("HowAreYou?", pojo.c);
>> +    Assert.assertEquals(3, pojo.d.size());
>> +    Assert.assertEquals("ABC", pojo.d.get(0));
>> +    Assert.assertEquals("PQR", pojo.d.get(1));
>> +    Assert.assertEquals("XYZ", pojo.d.get(2));
>> +    Assert.assertEquals(2015, new DateTime(pojo.date).getYear());
>> +    Assert.assertEquals(9, new DateTime(pojo.date).getMonthOfYear());
>> +    Assert.assertEquals(15, new DateTime(pojo.date).getDayOfMonth());
>> +  }
>> +
>> +  @Test
>> +  public void testJSONToPOJOInvalidData()
>> +  {
>> +    String tuple = "{\"a\":123\"b\":234876274,\"c\":\"HowAreYou?\"}";
>> +    operator.in.put(tuple);
>> +    Assert.assertEquals(0, validDataSink.collectedTuples.size());
>> +    Assert.assertEquals(1, invalidDataSink.collectedTuples.size());
>> +    Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0));
>> +  }
>> +
>> +  @Test
>> +  public void testJSONToPOJOUnknownFields()
>> +  {
>> +    String tuple =
>> "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"asd\":433.6}";
>> +    operator.in.put(tuple);
>> +    Assert.assertEquals(1, validDataSink.collectedTuples.size());
>> +    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
>> +    Object obj = validDataSink.collectedTuples.get(0);
>> +    Assert.assertNotNull(obj);
>> +    Assert.assertEquals(Test1Pojo.class, obj.getClass());
>> +    Test1Pojo pojo = (Test1Pojo)obj;
>> +    Assert.assertEquals(123, pojo.a);
>> +    Assert.assertEquals(234876274, pojo.b);
>> +    Assert.assertEquals("HowAreYou?", pojo.c);
>> +    Assert.assertEquals(null, pojo.d);
>> +  }
>> +
>> +  @Test
>> +  public void testJSONToPOJOMismatchingFields()
>> +  {
>> +    String tuple =
>> "{\"a\":123,\"c\":234876274,\"b\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"]}";
>> +    operator.in.put(tuple);
>> +    Assert.assertEquals(0, validDataSink.collectedTuples.size());
>> +    Assert.assertEquals(1, invalidDataSink.collectedTuples.size());
>> +    Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0));
>> +  }
>> +
>> +  @Test
>> +  public void testJSONToPOJOEmptyString()
>> +  {
>> +    String tuple = "";
>> +    operator.in.put(tuple);
>> +    Assert.assertEquals(0, validDataSink.collectedTuples.size());
>> +    Assert.assertEquals(1, invalidDataSink.collectedTuples.size());
>> +    Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0));
>> +  }
>> +
>> +  @Test
>> +  public void testJSONToPOJOEmptyJSON()
>> +  {
>> +    String tuple = "{}";
>> +    operator.in.put(tuple);
>> +    Assert.assertEquals(1, validDataSink.collectedTuples.size());
>> +    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
>> +    Object obj = validDataSink.collectedTuples.get(0);
>> +    Assert.assertNotNull(obj);
>> +    Assert.assertEquals(Test1Pojo.class, obj.getClass());
>> +    Test1Pojo pojo = (Test1Pojo)obj;
>> +    Assert.assertEquals(0, pojo.a);
>> +    Assert.assertEquals(0, pojo.b);
>> +    Assert.assertEquals(null, pojo.c);
>> +    Assert.assertEquals(null, pojo.d);
>> +  }
>> +
>> +  @Test
>> +  public void testJSONToPOJOArrayInJson()
>> +  {
>> +    String tuple =
>> "{\"a\":123,\"c\":[234,65,23],\"b\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"]}";
>> +    operator.in.put(tuple);
>> +    Assert.assertEquals(0, validDataSink.collectedTuples.size());
>> +    Assert.assertEquals(1, invalidDataSink.collectedTuples.size());
>> +    Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0));
>> +  }
>> +
>> +  public static class Test1Pojo
>> +  {
>> +    public int a;
>> +    public long b;
>> +    public String c;
>> +    public List<String> d;
>> +    public Date date;
>> +
>> +    @Override
>> +    public String toString()
>> +    {
>> +      return "Test1Pojo [a=" + a + ", b=" + b + ", c=" + c + ", d=" + d
>> + ", date=" + date + "]";
>> +    }
>> +  }
>> +}
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/test/java/com/datatorrent/contrib/schema/parser/XmlParserTest.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/contrib/src/test/java/com/datatorrent/contrib/schema/parser/XmlParserTest.java
>> b/contrib/src/test/java/com/datatorrent/contrib/schema/parser/XmlParserTest.java
>> new file mode 100644
>> index 0000000..4298951
>> --- /dev/null
>> +++
>> b/contrib/src/test/java/com/datatorrent/contrib/schema/parser/XmlParserTest.java
>> @@ -0,0 +1,254 @@
>> +package com.datatorrent.contrib.schema.parser;
>> +
>> +import java.util.Date;
>> +
>> +import org.joda.time.DateTime;
>> +import org.junit.Assert;
>> +import org.junit.Rule;
>> +import org.junit.Test;
>> +import org.junit.rules.TestWatcher;
>> +import org.junit.runner.Description;
>> +
>> +import com.datatorrent.lib.testbench.CollectorTestSink;
>> +import com.datatorrent.lib.util.TestUtils;
>> +
>> +public class XmlParserTest
>> +{
>> +  XmlParser operator;
>> +  CollectorTestSink<Object> validDataSink;
>> +  CollectorTestSink<String> invalidDataSink;
>> +
>> +  @Rule
>> +  public Watcher watcher = new Watcher();
>> +
>> +  public class Watcher extends TestWatcher
>> +  {
>> +
>> +    @Override
>> +    protected void starting(Description description)
>> +    {
>> +      super.starting(description);
>> +      operator = new XmlParser();
>> +      operator.setClazz(EmployeeBean.class);
>> +      operator.setDateFormats("yyyy-MM-dd"); //setting default date
>> pattern
>> +      validDataSink = new CollectorTestSink<Object>();
>> +      invalidDataSink = new CollectorTestSink<String>();
>> +      TestUtils.setSink(operator.out, validDataSink);
>> +      TestUtils.setSink(operator.err, invalidDataSink);
>> +    }
>> +
>> +    @Override
>> +    protected void finished(Description description)
>> +    {
>> +      super.finished(description);
>> +      operator.teardown();
>> +    }
>> +
>> +  }
>> +
>> +  @Test
>> +  public void testXmlToPojoWithoutAlias()
>> +  {
>> +    String tuple =
>> "<com.datatorrent.contrib.schema.parser.XmlParserTest_-EmployeeBean>" +
>> "<name>john</name>"
>> +        + "<dept>cs</dept>" + "<eid>1</eid>" +
>> "<dateOfJoining>2015-01-01</dateOfJoining>"
>> +        +
>> "</com.datatorrent.contrib.schema.parser.XmlParserTest_-EmployeeBean>";
>> +
>> +    operator.setup(null);
>> +    operator.activate(null);
>> +    operator.in.process(tuple);
>> +    Assert.assertEquals(1, validDataSink.collectedTuples.size());
>> +    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
>> +    Object obj = validDataSink.collectedTuples.get(0);
>> +    Assert.assertNotNull(obj);
>> +    Assert.assertEquals(EmployeeBean.class, obj.getClass());
>> +    EmployeeBean pojo = (EmployeeBean)obj;
>> +    Assert.assertEquals("john", pojo.getName());
>> +    Assert.assertEquals("cs", pojo.getDept());
>> +    Assert.assertEquals(1, pojo.getEid());
>> +    Assert.assertEquals(2015, new
>> DateTime(pojo.getDateOfJoining()).getYear());
>> +    Assert.assertEquals(1, new
>> DateTime(pojo.getDateOfJoining()).getMonthOfYear());
>> +    Assert.assertEquals(1, new
>> DateTime(pojo.getDateOfJoining()).getDayOfMonth());
>> +  }
>> +
>> +  @Test
>> +  public void testXmlToPojoWithAliasDateFormat()
>> +  {
>> +    String tuple = "<EmployeeBean>" + "<name>john</name>" +
>> "<dept>cs</dept>" + "<eid>1</eid>"
>> +        + "<dateOfJoining>2015-JAN-01</dateOfJoining>" +
>> "</EmployeeBean>";
>> +
>> +    operator.setAlias("EmployeeBean");
>> +    operator.setDateFormats("yyyy-MM-dd,yyyy-MMM-dd");
>> +    operator.setup(null);
>> +    operator.activate(null);
>> +    operator.in.process(tuple);
>> +    Assert.assertEquals(1, validDataSink.collectedTuples.size());
>> +    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
>> +    Object obj = validDataSink.collectedTuples.get(0);
>> +    Assert.assertNotNull(obj);
>> +    Assert.assertEquals(EmployeeBean.class, obj.getClass());
>> +    EmployeeBean pojo = (EmployeeBean)obj;
>> +    Assert.assertEquals("john", pojo.getName());
>> +    Assert.assertEquals("cs", pojo.getDept());
>> +    Assert.assertEquals(1, pojo.getEid());
>> +    Assert.assertEquals(2015, new
>> DateTime(pojo.getDateOfJoining()).getYear());
>> +    Assert.assertEquals(1, new
>> DateTime(pojo.getDateOfJoining()).getMonthOfYear());
>> +    Assert.assertEquals(1, new
>> DateTime(pojo.getDateOfJoining()).getDayOfMonth());
>> +  }
>> +
>> +  @Test
>> +  public void testXmlToPojoWithAlias()
>> +  {
>> +    String tuple = "<EmployeeBean>" + "<name>john</name>" +
>> "<dept>cs</dept>" + "<eid>1</eid>"
>> +        + "<dateOfJoining>2015-01-01</dateOfJoining>" +
>> "</EmployeeBean>";
>> +
>> +    operator.setAlias("EmployeeBean");
>> +    operator.setup(null);
>> +    operator.activate(null);
>> +    operator.in.process(tuple);
>> +    Assert.assertEquals(1, validDataSink.collectedTuples.size());
>> +    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
>> +    Object obj = validDataSink.collectedTuples.get(0);
>> +    Assert.assertNotNull(obj);
>> +    Assert.assertEquals(EmployeeBean.class, obj.getClass());
>> +    EmployeeBean pojo = (EmployeeBean)obj;
>> +    Assert.assertEquals("john", pojo.getName());
>> +    Assert.assertEquals("cs", pojo.getDept());
>> +    Assert.assertEquals(1, pojo.getEid());
>> +    Assert.assertEquals(2015, new
>> DateTime(pojo.getDateOfJoining()).getYear());
>> +    Assert.assertEquals(1, new
>> DateTime(pojo.getDateOfJoining()).getMonthOfYear());
>> +    Assert.assertEquals(1, new
>> DateTime(pojo.getDateOfJoining()).getDayOfMonth());
>> +  }
>> +
>> +  @Test
>> +  public void testXmlToPojoIncorrectXML()
>> +  {
>> +    String tuple = "<EmployeeBean>"
>> +        + "<firstname>john</firstname>" //incorrect field name
>> +        + "<dept>cs</dept>" + "<eid>1</eid>" +
>> "<dateOfJoining>2015-01-01 00:00:00.00 IST</dateOfJoining>"
>> +        + "</EmployeeBean>";
>> +
>> +    operator.setAlias("EmployeeBean");
>> +    operator.setup(null);
>> +    operator.activate(null);
>> +    operator.in.process(tuple);
>> +    Assert.assertEquals(0, validDataSink.collectedTuples.size());
>> +    Assert.assertEquals(1, invalidDataSink.collectedTuples.size());
>> +    Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0));
>> +  }
>> +
>> +  @Test
>> +  public void testXmlToPojoWithoutAliasHeirarchical()
>> +  {
>> +    String tuple =
>> "<com.datatorrent.contrib.schema.parser.XmlParserTest_-EmployeeBean>" +
>> "<name>john</name>"
>> +        + "<dept>cs</dept>" + "<eid>1</eid>" +
>> "<dateOfJoining>2015-01-01</dateOfJoining>" + "<address>"
>> +        + "<city>new york</city>" + "<country>US</country>" +
>> "</address>"
>> +        +
>> "</com.datatorrent.contrib.schema.parser.XmlParserTest_-EmployeeBean>";
>> +
>> +    operator.setup(null);
>> +    operator.activate(null);
>> +    operator.in.process(tuple);
>> +    Assert.assertEquals(1, validDataSink.collectedTuples.size());
>> +    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
>> +    Object obj = validDataSink.collectedTuples.get(0);
>> +    Assert.assertNotNull(obj);
>> +    Assert.assertEquals(EmployeeBean.class, obj.getClass());
>> +    EmployeeBean pojo = (EmployeeBean)obj;
>> +    Assert.assertEquals("john", pojo.getName());
>> +    Assert.assertEquals("cs", pojo.getDept());
>> +    Assert.assertEquals(1, pojo.getEid());
>> +    Assert.assertEquals(Address.class, pojo.getAddress().getClass());
>> +    Assert.assertEquals("new york", pojo.getAddress().getCity());
>> +    Assert.assertEquals("US", pojo.getAddress().getCountry());
>> +    Assert.assertEquals(2015, new
>> DateTime(pojo.getDateOfJoining()).getYear());
>> +    Assert.assertEquals(1, new
>> DateTime(pojo.getDateOfJoining()).getMonthOfYear());
>> +    Assert.assertEquals(1, new
>> DateTime(pojo.getDateOfJoining()).getDayOfMonth());
>> +  }
>> +
>> +  public static class EmployeeBean
>> +  {
>> +
>> +    private String name;
>> +    private String dept;
>> +    private int eid;
>> +    private Date dateOfJoining;
>> +    private Address address;
>> +
>> +    public String getName()
>> +    {
>> +      return name;
>> +    }
>> +
>> +    public void setName(String name)
>> +    {
>> +      this.name = name;
>> +    }
>> +
>> +    public String getDept()
>> +    {
>> +      return dept;
>> +    }
>> +
>> +    public void setDept(String dept)
>> +    {
>> +      this.dept = dept;
>> +    }
>> +
>> +    public int getEid()
>> +    {
>> +      return eid;
>> +    }
>> +
>> +    public void setEid(int eid)
>> +    {
>> +      this.eid = eid;
>> +    }
>> +
>> +    public Date getDateOfJoining()
>> +    {
>> +      return dateOfJoining;
>> +    }
>> +
>> +    public void setDateOfJoining(Date dateOfJoining)
>> +    {
>> +      this.dateOfJoining = dateOfJoining;
>> +    }
>> +
>> +    public Address getAddress()
>> +    {
>> +      return address;
>> +    }
>> +
>> +    public void setAddress(Address address)
>> +    {
>> +      this.address = address;
>> +    }
>> +  }
>> +
>> +  public static class Address
>> +  {
>> +
>> +    private String city;
>> +    private String country;
>> +
>> +    public String getCity()
>> +    {
>> +      return city;
>> +    }
>> +
>> +    public void setCity(String city)
>> +    {
>> +      this.city = city;
>> +    }
>> +
>> +    public String getCountry()
>> +    {
>> +      return country;
>> +    }
>> +
>> +    public void setCountry(String country)
>> +    {
>> +      this.country = country;
>> +    }
>> +  }
>> +
>> +}
>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message