apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chin...@apache.org
Subject incubator-apex-malhar git commit: APEXMALHAR-2011-2012 read & convert Avro to Pojo
Date Fri, 01 Apr 2016 09:40:33 GMT
Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/master a23cc5b8f -> 5075ce0ef


APEXMALHAR-2011-2012 read & convert Avro to Pojo


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/5075ce0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/5075ce0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/5075ce0e

Branch: refs/heads/master
Commit: 5075ce0ef75afccdff2edf4c044465340176a148
Parents: a23cc5b
Author: Devendra Tagare <devtagare@Devendras-MacBook-Pro.local>
Authored: Tue Mar 15 20:09:06 2016 +0530
Committer: devtagare <devtagare@gmail.com>
Committed: Thu Mar 31 12:45:14 2016 -0700

----------------------------------------------------------------------
 .../contrib/avro/AvroFileInputOperator.java     | 168 +++++++
 .../contrib/avro/AvroRecordHelper.java          | 123 +++++
 .../datatorrent/contrib/avro/AvroToPojo.java    | 411 +++++++++++++++++
 .../datatorrent/contrib/avro/PojoToAvro.java    | 273 +++++++++++
 .../contrib/avro/AvroFileInputOperatorTest.java | 448 +++++++++++++++++++
 .../contrib/avro/AvroToPojoTest.java            | 325 ++++++++++++++
 .../contrib/avro/PojoToAvroTest.java            | 236 ++++++++++
 7 files changed, 1984 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5075ce0e/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java
new file mode 100644
index 0000000..14dfdf2
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.avro;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.lib.io.IdempotentStorageManager;
+import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
+
+/**
+ * <p>
+ * Avro File Input Operator
+ * </p>
+ * A specific implementation of the AbstractFileInputOperator to read Avro
+ * container files.<br>
+ * No need to provide schema,its inferred from the file<br>
+ * This operator emits a GenericRecord based on the schema derived from the
+ * input file<br>
+ * Users can add the {@link IdempotentStorageManager.FSIdempotentStorageManager}
+ * to ensure exactly once semantics with a HDFS backed WAL.
+ * 
+ * @displayName AvroFileInputOperator
+ * @category Input
+ * @tags fs, file,avro, input operator
+ */
+@InterfaceStability.Evolving
+public class AvroFileInputOperator extends AbstractFileInputOperator<GenericRecord>
+{
+
+  private transient long offset = 0L;
+
+  @AutoMetric
+  @VisibleForTesting
+  int recordCount = 0;
+
+  @AutoMetric
+  @VisibleForTesting
+  int errorCount = 0;
+
+  private transient DataFileStream<GenericRecord> avroDataStream;
+
+  public final transient DefaultOutputPort<GenericRecord> output = new DefaultOutputPort<GenericRecord>();
+
+  public final transient DefaultOutputPort<String> completedFilesPort = new DefaultOutputPort<String>();
+
+  public final transient DefaultOutputPort<String> errorRecordsPort = new DefaultOutputPort<String>();
+
+  /**
+   * Returns a input stream given a file path
+   * 
+   * @param path
+   * @return InputStream
+   * @throws IOException
+   */
+  @Override
+  protected InputStream openFile(Path path) throws IOException
+  {
+    InputStream is = super.openFile(path);
+    if (is != null) {
+      DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
+      avroDataStream = new DataFileStream<GenericRecord>(is, datumReader);
+      datumReader.setSchema(avroDataStream.getSchema());
+    }
+    return is;
+  }
+
+  /**
+   * Reads a GenericRecord from the given input stream<br>
+   * Emits the FileName,Offset,Exception on the error port if its connected
+   * 
+   * @return GenericRecord
+   */
+  @Override
+  protected GenericRecord readEntity() throws IOException
+  {
+    GenericRecord record = null;
+
+    record = null;
+
+    try {
+      if (avroDataStream != null && avroDataStream.hasNext()) {
+        offset++;
+        record = avroDataStream.next();
+        recordCount++;
+        return record;
+      }
+    } catch (AvroRuntimeException are) {
+      LOG.error("Exception in parsing record for file - " + super.currentFile + " at offset - " + offset, are);
+      if (errorRecordsPort.isConnected()) {
+        errorRecordsPort.emit("FileName:" + super.currentFile + ", Offset:" + offset);
+      }
+      errorCount++;
+      throw new AvroRuntimeException(are);
+    }
+    return record;
+  }
+
+  /**
+   * Closes the input stream If the completed files port is connected, the
+   * completed file is emitted from this port
+   */
+  @Override
+  protected void closeFile(InputStream is) throws IOException
+  {
+    String fileName = super.currentFile;
+
+    if (avroDataStream != null) {
+      avroDataStream.close();
+    }
+
+    super.closeFile(is);
+
+    if (completedFilesPort.isConnected()) {
+      completedFilesPort.emit(fileName);
+    }
+    offset = 0;
+  }
+
+  @Override
+  protected void emit(GenericRecord tuple)
+  {
+    if (tuple != null) {
+      output.emit(tuple);
+    }
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    errorCount = 0;
+    recordCount = 0;
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(AvroFileInputOperator.class);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5075ce0e/contrib/src/main/java/com/datatorrent/contrib/avro/AvroRecordHelper.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/avro/AvroRecordHelper.java b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroRecordHelper.java
new file mode 100644
index 0000000..8f29b86
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroRecordHelper.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.avro;
+
+import java.text.ParseException;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
+
+/**
+ * This is an utility class for reading Avro converted records.<br>
+ * This class can be used with the {@link PojoToAvro} or in isolation to get Avro values.
+ */
+public class AvroRecordHelper
+{
+
+  /**
+   * Convert a passed String value to the given type for the key as per Schema
+   */
+  public static Object convertValueStringToAvroKeyType(Schema schema, String key, String value) throws ParseException
+  {
+    Type type = null;
+
+    if (schema.getField(key) != null) {
+      type = schema.getField(key).schema().getType();
+    } else {
+      return value;
+    }
+
+    Object convertedValue = null;
+
+    if (type == Type.UNION) {
+      convertedValue = convertAndResolveUnionToPrimitive(schema, key, value);
+    } else {
+      convertedValue = convertValueToAvroPrimitive(type, key, value);
+    }
+
+    return convertedValue;
+
+  }
+
+  private static Object convertValueToAvroPrimitive(Type type, String key, String value) throws ParseException
+  {
+    Object newValue = value;
+    switch (type) {
+      case BOOLEAN:
+        newValue = Boolean.parseBoolean(value);
+        break;
+      case DOUBLE:
+        newValue = Double.parseDouble(value);
+        break;
+      case FLOAT:
+        newValue = Float.parseFloat(value);
+        break;
+      case INT:
+        newValue = Integer.parseInt(value);
+        break;
+      case LONG:
+        newValue = Long.parseLong(value);
+        break;
+      case BYTES:
+        newValue = value.getBytes();
+        break;
+      case STRING:
+        newValue = value;
+        break;
+      case NULL:
+        newValue = null;
+        break;
+      default:
+        newValue = value;
+    }
+    return newValue;
+  }
+
+  private static Object convertAndResolveUnionToPrimitive(Schema schema, String key, String value) throws ParseException
+  {
+    Schema unionSchema = schema.getField(key).schema();
+    List<Schema> types = unionSchema.getTypes();
+    Object convertedValue = null;
+    for (int i = 0; i < types.size(); i++) {
+      try {
+        if (types.get(i).getType() == Type.NULL) {
+          if (value == null || value.equals("null")) {
+            convertedValue = null;
+            break;
+          } else {
+            continue;
+          }
+        }
+        convertedValue = convertValueToAvroPrimitive(types.get(i).getType(), key, value);
+      } catch (RuntimeException e) {
+        LOG.error("Could not handle schema resolution", e);
+        continue;
+      }
+      break;
+    }
+
+    return convertedValue;
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(AvroRecordHelper.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5075ce0e/contrib/src/main/java/com/datatorrent/contrib/avro/AvroToPojo.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/avro/AvroToPojo.java b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroToPojo.java
new file mode 100644
index 0000000..7fa0936
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroToPojo.java
@@ -0,0 +1,411 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.avro;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.StringTokenizer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.lang3.ClassUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.FieldInfo.SupportType;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * <p>
+ * AvroToPojo
+ * </p>
+ * A generic implementation for conversion from Avro to POJO. The POJO class
+ * name & field mapping should be provided by the user.<br>
+ * If this mapping is not provided then reflection is used to determine this
+ * mapping.<br>
+ * As of now only primitive types are supported.<br>
+ * Error records are emitted on the errorPort if connected
+ *
+ * @displayName Avro To Pojo
+ * @category Converter
+ * @tags avro
+ */
+@InterfaceStability.Evolving
+public class AvroToPojo extends BaseOperator
+{
+
+  private transient Class<?> pojoClass;
+
+  private static final String FIELD_SEPARATOR = ":";
+  private static final String RECORD_SEPARATOR = ",";
+
+  private String genericRecordToPOJOFieldsMapping = null;
+
+  private List<FieldInfo> fieldInfos;
+
+  private List<ActiveFieldInfo> columnFieldSetters;
+
+  @AutoMetric
+  @VisibleForTesting
+  int recordCount = 0;
+
+  @AutoMetric
+  @VisibleForTesting
+  int errorCount = 0;
+
+  @AutoMetric
+  @VisibleForTesting
+  int fieldErrorCount = 0;
+
+  public final transient DefaultOutputPort<GenericRecord> errorPort = new DefaultOutputPort<GenericRecord>();
+
+  /**
+   * Returns a string representing mapping from generic record to POJO fields
+   */
+  public String getGenericRecordToPOJOFieldsMapping()
+  {
+    return genericRecordToPOJOFieldsMapping;
+  }
+
+  /**
+   * Comma separated list mapping a field in Avro schema to POJO field eg :
+   * orderId:orderId:INTEGER,total:total:DOUBLE
+   */
+  public void setGenericRecordToPOJOFieldsMapping(String genericRecordToPOJOFieldsMapping)
+  {
+    this.genericRecordToPOJOFieldsMapping = genericRecordToPOJOFieldsMapping;
+  }
+
+  public final transient DefaultInputPort<GenericRecord> data = new DefaultInputPort<GenericRecord>()
+  {
+    @Override
+    public void process(GenericRecord tuple)
+    {
+      processTuple(tuple);
+    }
+  };
+
+  /**
+   * Converts given Generic Record and to a POJO and emits it
+   */
+  protected void processTuple(GenericRecord tuple)
+  {
+    try {
+      Object obj = getPOJOFromGenericRecord(tuple);
+
+      if (obj != null) {
+        output.emit(obj);
+        recordCount++;
+      } else if (errorPort.isConnected()) {
+        errorPort.emit(tuple);
+        errorCount++;
+      }
+
+    } catch (InstantiationException | IllegalAccessException e) {
+      LOG.error("Could not initialize object of class -" + getClass().getName(), e);
+      errorCount++;
+    }
+  }
+
+  /**
+   * Returns a POJO from a Generic Record
+   * 
+   * @return Object
+   */
+  @SuppressWarnings("unchecked")
+  private Object getPOJOFromGenericRecord(GenericRecord tuple) throws InstantiationException, IllegalAccessException
+  {
+    Object newObj = getPojoClass().newInstance();
+
+    try {
+      for (int i = 0; i < columnFieldSetters.size(); i++) {
+
+        AvroToPojo.ActiveFieldInfo afi = columnFieldSetters.get(i);
+        SupportType st = afi.fieldInfo.getType();
+        Object val = null;
+
+        try {
+          val = tuple.get(afi.fieldInfo.getColumnName());
+        } catch (Exception e) {
+          LOG.error("Could not find field -" + afi.fieldInfo.getColumnName() + "- in the generic record", e);
+          val = null;
+          fieldErrorCount++;
+        }
+
+        if (val == null) {
+          continue;
+        }
+
+        try {
+          switch (st) {
+            case BOOLEAN:
+              ((PojoUtils.SetterBoolean<Object>)afi.setterOrGetter).set(newObj,
+                  (boolean)tuple.get(afi.fieldInfo.getColumnName()));
+              break;
+
+            case DOUBLE:
+              ((PojoUtils.SetterDouble<Object>)afi.setterOrGetter).set(newObj,
+                  (double)tuple.get(afi.fieldInfo.getColumnName()));
+              break;
+
+            case FLOAT:
+              ((PojoUtils.SetterFloat<Object>)afi.setterOrGetter).set(newObj,
+                  (float)tuple.get(afi.fieldInfo.getColumnName()));
+              break;
+
+            case INTEGER:
+              ((PojoUtils.SetterInt<Object>)afi.setterOrGetter).set(newObj,
+                  (int)tuple.get(afi.fieldInfo.getColumnName()));
+              break;
+
+            case STRING:
+              ((PojoUtils.Setter<Object, String>)afi.setterOrGetter).set(newObj,
+                  new String(tuple.get(afi.fieldInfo.getColumnName()).toString()));
+              break;
+
+            case LONG:
+              ((PojoUtils.SetterLong<Object>)afi.setterOrGetter).set(newObj,
+                  (long)tuple.get(afi.fieldInfo.getColumnName()));
+              break;
+
+            default:
+              throw new AvroRuntimeException("Invalid Support Type");
+
+          }
+        } catch (AvroRuntimeException e) {
+          LOG.error("Exception in setting value", e);
+          fieldErrorCount++;
+        }
+
+      }
+    } catch (Exception ex) {
+      LOG.error("Generic Exception in setting value" + ex.getMessage());
+      errorCount++;
+      newObj = null;
+    }
+    return newObj;
+  }
+
+  /**
+   * Use reflection to generate field info values if the user has not provided
+   * the inputs mapping
+   * 
+   * @return String representing the POJO field to Avro field mapping
+   */
+  private String generateFieldInfoInputs(Class<?> cls)
+  {
+    java.lang.reflect.Field[] fields = cls.getDeclaredFields();
+    StringBuilder sb = new StringBuilder();
+
+    for (int i = 0; i < fields.length; i++) {
+      java.lang.reflect.Field f = fields[i];
+      Class<?> c = ClassUtils.primitiveToWrapper(f.getType());
+      sb.append(f.getName()).append(FIELD_SEPARATOR).append(f.getName()).append(FIELD_SEPARATOR)
+          .append(c.getSimpleName().toUpperCase()).append(RECORD_SEPARATOR);
+    }
+    return sb.substring(0, sb.length() - 1);
+  }
+
+  /**
+   * Creates a map representing fieldName in POJO:field in Generic Record:Data
+   * type
+   * 
+   * @return List of FieldInfo
+   */
+  private List<FieldInfo> createFieldInfoMap(String str)
+  {
+    fieldInfos = new ArrayList<FieldInfo>();
+    StringTokenizer strtok = new StringTokenizer(str, RECORD_SEPARATOR);
+
+    while (strtok.hasMoreTokens()) {
+      String[] token = strtok.nextToken().split(FIELD_SEPARATOR);
+      try {
+        fieldInfos.add(new FieldInfo(token[0], token[1], SupportType.valueOf(token[2])));
+      } catch (Exception e) {
+        LOG.error("Invalid support type", e);
+      }
+    }
+    return fieldInfos;
+  }
+
+  @OutputPortFieldAnnotation(schemaRequired = true)
+  public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<Object>()
+  {
+    public void setup(PortContext context)
+    {
+      setPojoClass(context.getValue(Context.PortContext.TUPLE_CLASS));
+
+      columnFieldSetters = Lists.newArrayList();
+
+      /**
+       * Check if the mapping of Generic record fields to POJO is given, else
+       * use reflection
+       */
+      if (getGenericRecordToPOJOFieldsMapping() == null) {
+        setFieldInfos(createFieldInfoMap(generateFieldInfoInputs(getPojoClass())));
+      } else {
+        setFieldInfos(createFieldInfoMap(getGenericRecordToPOJOFieldsMapping()));
+      }
+
+      initColumnFieldSetters(getFieldInfos());
+      initializeActiveFieldSetters();
+    }
+  };
+
+  @Override
+  public void endWindow()
+  {
+    errorCount = 0;
+    fieldErrorCount = 0;
+    recordCount = 0;
+
+  }
+
+  private Class<?> getPojoClass()
+  {
+    return pojoClass;
+  }
+
+  public void setPojoClass(Class<?> pojoClass)
+  {
+    this.pojoClass = pojoClass;
+  }
+
+  /**
+   * Class that maps fieldInfo to its getters or setters
+   */
+  protected static class ActiveFieldInfo
+  {
+    final FieldInfo fieldInfo;
+    Object setterOrGetter;
+
+    ActiveFieldInfo(FieldInfo fieldInfo)
+    {
+      this.fieldInfo = fieldInfo;
+    }
+
+  }
+
+  /**
+   * A list of {@link FieldInfo}s where each item maps a column name to a pojo
+   * field name.
+   */
+  private List<FieldInfo> getFieldInfos()
+  {
+    return fieldInfos;
+  }
+
+  /**
+   * Add the Active Fields to the columnFieldSetters {@link ActiveFieldInfo}s
+   */
+  private void initColumnFieldSetters(List<FieldInfo> fieldInfos)
+  {
+    for (FieldInfo fi : fieldInfos) {
+      if (columnFieldSetters == null) {
+        columnFieldSetters = Lists.newArrayList();
+      }
+      columnFieldSetters.add(new AvroToPojo.ActiveFieldInfo(fi));
+    }
+  }
+
+  /**
+   * Sets the {@link FieldInfo}s. A {@link FieldInfo} maps a store column to a
+   * pojo field name.<br/>
+   * The value from fieldInfo.column is assigned to
+   * fieldInfo.pojoFieldExpression.
+   *
+   * @description $[].columnName name of the Output Field in POJO
+   * @description $[].pojoFieldExpression expression to get the respective field
+   *              from generic record
+   * @useSchema $[].pojoFieldExpression outputPort.fields[].name
+   */
+  private void setFieldInfos(List<FieldInfo> fieldInfos)
+  {
+    this.fieldInfos = fieldInfos;
+  }
+
+  /**
+   * Initialize the setters for generating the POJO
+   */
+  private void initializeActiveFieldSetters()
+  {
+    for (int i = 0; i < columnFieldSetters.size(); i++) {
+      ActiveFieldInfo activeFieldInfo = columnFieldSetters.get(i);
+
+      SupportType st = activeFieldInfo.fieldInfo.getType();
+
+      switch (st) {
+
+        case BOOLEAN:
+
+          activeFieldInfo.setterOrGetter = PojoUtils.createSetterBoolean(getPojoClass(),
+              activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          break;
+
+        case DOUBLE:
+          activeFieldInfo.setterOrGetter = PojoUtils.createSetterDouble(getPojoClass(),
+              activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          break;
+
+        case FLOAT:
+          activeFieldInfo.setterOrGetter = PojoUtils.createSetterFloat(getPojoClass(),
+              activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          break;
+
+        case INTEGER:
+          activeFieldInfo.setterOrGetter = PojoUtils.createSetterInt(getPojoClass(),
+              activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          break;
+
+        case STRING:
+          activeFieldInfo.setterOrGetter = PojoUtils.createSetter(getPojoClass(),
+              activeFieldInfo.fieldInfo.getPojoFieldExpression(), activeFieldInfo.fieldInfo.getType().getJavaType());
+          break;
+
+        case LONG:
+          activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(getPojoClass(),
+              activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          break;
+
+        default:
+          activeFieldInfo.setterOrGetter = PojoUtils.createSetter(getPojoClass(),
+              activeFieldInfo.fieldInfo.getPojoFieldExpression(), Byte.class);
+          break;
+      }
+
+      columnFieldSetters.get(i).setterOrGetter = activeFieldInfo.setterOrGetter;
+    }
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(AvroToPojo.class);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5075ce0e/contrib/src/main/java/com/datatorrent/contrib/avro/PojoToAvro.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/avro/PojoToAvro.java b/contrib/src/main/java/com/datatorrent/contrib/avro/PojoToAvro.java
new file mode 100644
index 0000000..dc90800
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/avro/PojoToAvro.java
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.avro;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.lang3.ClassUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.PojoUtils;
+import com.datatorrent.lib.util.PojoUtils.Getter;
+
+/**
+ * <p>
+ * PojoToAvro
+ * </p>
+ * A generic implementation for POJO to Avro conversion. A POJO is converted to
+ * a GenericRecord based on the schema provided.<br>
+ * As of now only primitive types are supported.<br>
+ * Error records are emitted on the errorPort if connected
+ *
+ * @displayName Pojo To Avro
+ * @category Converter
+ * @tags avro
+ */
+@InterfaceStability.Evolving
+public class PojoToAvro extends BaseOperator
+{
+
+  private List<Field> columnNames;
+
+  private Class<?> cls;
+
+  private List<Getter> keyMethodMap;
+
+  private transient String schemaString;
+
+  private transient Schema schema;
+
+  @AutoMetric
+  @VisibleForTesting
+  int recordCount = 0;
+
+  @AutoMetric
+  @VisibleForTesting
+  int errorCount = 0;
+
+  @AutoMetric
+  @VisibleForTesting
+  int fieldErrorCount = 0;
+
+  public final transient DefaultOutputPort<GenericRecord> output = new DefaultOutputPort<GenericRecord>();
+
+  public final transient DefaultOutputPort<Object> errorPort = new DefaultOutputPort<Object>();
+
+  private void parseSchema() throws IOException
+  {
+    setSchema(new Schema.Parser().parse(getSchemaString()));
+  }
+
+  /**
+   * Returns the schema string for Avro Generic Record
+   * 
+   * @return schemaString
+   */
+  public String getSchemaString()
+  {
+    return schemaString;
+  }
+
+  /**
+   * Sets the schema string
+   */
+  public void setSchemaString(String schemaString)
+  {
+    this.schemaString = schemaString;
+  }
+
+  /**
+   * Returns the schema object
+   * 
+   * @return schema
+   */
+  private Schema getSchema()
+  {
+    return schema;
+  }
+
+  /**
+   * Sets the shcema object
+   */
+  private void setSchema(Schema schema)
+  {
+    this.schema = schema;
+  }
+
+  /**
+   * Returns the list for field names from provided Avro schema
+   * 
+   * @return List of Fields
+   */
+  private List<Field> getColumnNames()
+  {
+    return columnNames;
+  }
+
+  /**
+   * Sets the list of column names representing the fields in Avro schema
+   */
+  private void setColumnNames(List<Field> columnNames)
+  {
+    this.columnNames = columnNames;
+  }
+
+  /**
+   * This method generates the getters for provided field of a given class
+   * 
+   * @return Getter
+   */
+  private Getter<?, ?> generateGettersForField(Class<?> cls, String inputFieldName)
+      throws NoSuchFieldException, SecurityException
+  {
+    java.lang.reflect.Field f = cls.getDeclaredField(inputFieldName);
+    Class<?> c = ClassUtils.primitiveToWrapper(f.getType());
+
+    Getter<?, ?> classGetter = PojoUtils.createGetter(cls, inputFieldName, c);
+
+    return classGetter;
+  }
+
+  /**
+   * Initializes the list of columns in POJO based on the names from schema
+   */
+  private void initializeColumnMap(Schema schema)
+  {
+    setColumnNames(schema.getFields());
+
+    keyMethodMap = new ArrayList<Getter>();
+    for (int i = 0; i < getColumnNames().size(); i++) {
+      try {
+        keyMethodMap.add(generateGettersForField(cls, getColumnNames().get(i).name()));
+      } catch (NoSuchFieldException | SecurityException e) {
+        throw new RuntimeException("Failed to initialize pojo class getters for field - ", e);
+      }
+    }
+  }
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public final transient DefaultInputPort<Object> data = new DefaultInputPort<Object>()
+  {
+
+    @Override
+    public void setup(PortContext context)
+    {
+      cls = context.getValue(Context.PortContext.TUPLE_CLASS);
+
+      try {
+        parseSchema();
+        initializeColumnMap(getSchema());
+      } catch (IOException e) {
+        LOG.error("Exception in parsing schema", e);
+      }
+    }
+
+    @Override
+    public void process(Object tuple)
+    {
+      processTuple(tuple);
+    }
+
+  };
+
+  /**
+   * Converts incoming tuples into Generic records
+   */
+  protected void processTuple(Object tuple)
+  {
+    GenericRecord record = null;
+
+    try {
+      record = getGenericRecord(tuple);
+    } catch (Exception e) {
+      LOG.error("Exception in parsing record");
+      errorCount++;
+    }
+
+    if (record != null) {
+      output.emit(record);
+      recordCount++;
+    } else if (errorPort.isConnected()) {
+      errorPort.emit(tuple);
+      errorCount++;
+    }
+  }
+
+  /**
+   * Returns a generic record mapping the POJO fields to provided schema
+   * 
+   * @return Generic Record
+   */
+  private GenericRecord getGenericRecord(Object tuple) throws Exception
+  {
+    int writeErrorCount = 0;
+    GenericRecord rec = new GenericData.Record(getSchema());
+
+    for (int i = 0; i < columnNames.size(); i++) {
+      try {
+        rec.put(columnNames.get(i).name(), AvroRecordHelper.convertValueStringToAvroKeyType(getSchema(),
+            columnNames.get(i).name(), keyMethodMap.get(i).get(tuple).toString()));
+      } catch (AvroRuntimeException e) {
+        LOG.error("Could not set Field [" + columnNames.get(i).name() + "] in the generic record", e);
+        fieldErrorCount++;
+      } catch (Exception e) {
+        LOG.error("Parse Exception", e);
+        fieldErrorCount++;
+        writeErrorCount++;
+      }
+    }
+
+    if (columnNames.size() == writeErrorCount) {
+      errorCount++;
+      return null;
+    } else {
+      return rec;
+    }
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    recordCount = 0;
+    errorCount = 0;
+    fieldErrorCount = 0;
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(PojoToAvro.class);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5075ce0e/contrib/src/test/java/com/datatorrent/contrib/avro/AvroFileInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/avro/AvroFileInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/avro/AvroFileInputOperatorTest.java
new file mode 100644
index 0000000..813f189
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/avro/AvroFileInputOperatorTest.java
@@ -0,0 +1,448 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.avro;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.python.google.common.collect.Lists;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.collect.Sets;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.helper.TestPortContext;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+/**
+ * <p>
+ * In this class the emitTuples method is called twice to process the first
+ * input, since on begin window 0 the operator is setup & stream is initialized.
+ * The platform calls the emitTuples method in the successive windows
+ * </p>
+ */
+public class AvroFileInputOperatorTest
+{
+
+  private static final String AVRO_SCHEMA = "{\"namespace\":\"abc\"," + ""
+      + "\"type\":\"record\",\"doc\":\"Order schema\"," + "\"name\":\"Order\",\"fields\":[{\"name\":\"orderId\","
+      + "\"type\": \"long\"}," + "{\"name\":\"customerId\",\"type\": \"int\"},"
+      + "{\"name\":\"total\",\"type\": \"double\"}," + "{\"name\":\"customerName\",\"type\": \"string\"}]}";
+
+  private static final String FILENAME = "/tmp/simpleorder.avro";
+  private static final String OTHER_FILE = "/tmp/simpleorder2.avro";
+  private static final String ERROR_FILE = "/tmp/errorFile.avro";
+
+  CollectorTestSink<Object> output = new CollectorTestSink<Object>();
+
+  CollectorTestSink<Object> completedFilesPort = new CollectorTestSink<Object>();
+
+  CollectorTestSink<Object> errorRecordsPort = new CollectorTestSink<Object>();
+
+  AvroFileInputOperator avroFileInput = new AvroFileInputOperator();
+
+  private List<GenericRecord> recordList = null;
+
+  public static class TestMeta extends TestWatcher
+  {
+    public String dir = null;
+    Context.OperatorContext context;
+    Context.PortContext portContext;
+
+    @Override
+    protected void starting(org.junit.runner.Description description)
+    {
+      String methodName = description.getMethodName();
+      String className = description.getClassName();
+      this.dir = "target/" + className + "/" + methodName;
+      Attribute.AttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+      attributes.put(Context.DAGContext.APPLICATION_PATH, dir);
+      context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributes);
+      Attribute.AttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+      portAttributes.put(Context.PortContext.TUPLE_CLASS, SimpleOrder.class);
+      portContext = new TestPortContext(portAttributes);
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      try {
+        FileUtils.deleteDirectory(new File("target/" + description.getClassName()));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Test
+  public void testSingleFileAvroReads() throws Exception
+  {
+    FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true);
+
+    int cnt = 7;
+    createAvroInput(cnt);
+    writeAvroFile(new File(FILENAME));
+
+    avroFileInput.output.setSink(output);
+    avroFileInput.completedFilesPort.setSink(completedFilesPort);
+    avroFileInput.errorRecordsPort.setSink(errorRecordsPort);
+    avroFileInput.setDirectory(testMeta.dir);
+    avroFileInput.setup(testMeta.context);
+
+    avroFileInput.beginWindow(0);
+    avroFileInput.emitTuples();
+    avroFileInput.emitTuples();
+    Assert.assertEquals("Record count", cnt, avroFileInput.recordCount);
+    avroFileInput.endWindow();
+    Assert.assertEquals("number tuples", cnt, output.collectedTuples.size());
+    Assert.assertEquals("Error tuples", 0, errorRecordsPort.collectedTuples.size());
+    Assert.assertEquals("Completed File", 1, completedFilesPort.collectedTuples.size());
+    avroFileInput.teardown();
+
+  }
+
+  @Test
+  public void testMultipleFileAvroReads() throws Exception
+  {
+    FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true);
+
+    int cnt = 7;
+
+    createAvroInput(cnt);
+
+    writeAvroFile(new File(FILENAME));
+    writeAvroFile(new File(OTHER_FILE));
+
+    avroFileInput.output.setSink(output);
+    avroFileInput.completedFilesPort.setSink(completedFilesPort);
+    avroFileInput.errorRecordsPort.setSink(errorRecordsPort);
+    avroFileInput.setDirectory(testMeta.dir);
+    avroFileInput.setup(testMeta.context);
+
+    avroFileInput.beginWindow(0);
+    avroFileInput.emitTuples();
+    avroFileInput.beginWindow(1);
+    avroFileInput.emitTuples();
+
+    Assert.assertEquals("number tuples after window 0", cnt, output.collectedTuples.size());
+
+    avroFileInput.emitTuples();
+    avroFileInput.endWindow();
+
+    Assert.assertEquals("Error tuples", 0, errorRecordsPort.collectedTuples.size());
+    Assert.assertEquals("number tuples after window 1", 2 * cnt, output.collectedTuples.size());
+    Assert.assertEquals("Completed File", 2, completedFilesPort.collectedTuples.size());
+
+    avroFileInput.teardown();
+
+  }
+
+  @Test
+  public void testInvalidFormatFailure() throws Exception
+  {
+    FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true);
+
+    int cnt = 7;
+    writeErrorFile(cnt, new File(ERROR_FILE));
+
+    avroFileInput.output.setSink(output);
+    avroFileInput.setDirectory(testMeta.dir);
+    avroFileInput.setup(testMeta.context);
+
+    avroFileInput.beginWindow(0);
+    avroFileInput.emitTuples();
+    avroFileInput.emitTuples();
+    avroFileInput.endWindow();
+
+    Assert.assertEquals("number tuples after window 1", 0, output.collectedTuples.size());
+    avroFileInput.teardown();
+  }
+
+  private void createAvroInput(int cnt)
+  {
+    recordList = Lists.newArrayList();
+
+    while (cnt > 0) {
+      GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(AVRO_SCHEMA));
+      rec.put("orderId", cnt * 1L);
+      rec.put("customerId", cnt * 2);
+      rec.put("total", cnt * 1.5);
+      rec.put("customerName", "*" + cnt + "*");
+      cnt--;
+      recordList.add(rec);
+    }
+  }
+
+  private void writeErrorFile(int cnt, File errorFile) throws IOException
+  {
+    List<String> allLines = Lists.newArrayList();
+    HashSet<String> lines = Sets.newHashSet();
+    for (int line = 0; line < 5; line++) {
+      lines.add("f0" + "l" + line);
+    }
+
+    allLines.addAll(lines);
+
+    FileUtils.write(errorFile, StringUtils.join(lines, '\n'));
+
+    FileUtils.moveFileToDirectory(new File(errorFile.getAbsolutePath()), new File(testMeta.dir), true);
+  }
+
+  private void writeAvroFile(File outputFile) throws IOException
+  {
+
+    DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(
+        new Schema.Parser().parse(AVRO_SCHEMA));
+
+    DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);
+    dataFileWriter.create(new Schema.Parser().parse(AVRO_SCHEMA), outputFile);
+
+    for (GenericRecord record : recordList) {
+      dataFileWriter.append(record);
+    }
+
+    dataFileWriter.close();
+
+    FileUtils.moveFileToDirectory(new File(outputFile.getAbsolutePath()), new File(testMeta.dir), true);
+
+  }
+
+  @Test
+  public void testApplication() throws IOException, Exception
+  {
+    try {
+      FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true);
+      int cnt = 7;
+      createAvroInput(cnt);
+      writeAvroFile(new File(FILENAME));
+      createAvroInput(cnt - 2);
+      writeAvroFile(new File(OTHER_FILE));
+      avroFileInput.setDirectory(testMeta.dir);
+
+      LocalMode lma = LocalMode.newInstance();
+      Configuration conf = new Configuration(false);
+
+      AvroReaderApplication avroReaderApplication = new AvroReaderApplication();
+      avroReaderApplication.setAvroFileInputOperator(avroFileInput);
+      lma.prepareDAG(avroReaderApplication, conf);
+
+      LocalMode.Controller lc = lma.getController();
+      lc.run(10000);// runs for 10 seconds and quits
+    } catch (ConstraintViolationException e) {
+      Assert.fail("constraint violations: " + e.getConstraintViolations());
+    }
+  }
+
+  @Test
+  public void testApplicationWithPojoConversion() throws IOException, Exception
+  {
+    try {
+      FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true);
+      int cnt = 7;
+      createAvroInput(cnt);
+      writeAvroFile(new File(FILENAME));
+      createAvroInput(cnt - 2);
+      writeAvroFile(new File(OTHER_FILE));
+
+      avroFileInput.setDirectory(testMeta.dir);
+
+      AvroToPojo avroToPojo = new AvroToPojo();
+      avroToPojo.setPojoClass(SimpleOrder.class);
+
+      LocalMode lma = LocalMode.newInstance();
+      Configuration conf = new Configuration(false);
+
+      AvroToPojoApplication avroToPojoApplication = new AvroToPojoApplication();
+      avroToPojoApplication.setAvroFileInputOperator(avroFileInput);
+      avroToPojoApplication.setAvroToPojo(avroToPojo);
+
+      lma.prepareDAG(avroToPojoApplication, conf);
+      LocalMode.Controller lc = lma.getController();
+      lc.run(10000);// runs for 10 seconds and quits
+    } catch (ConstraintViolationException e) {
+      Assert.fail("constraint violations: " + e.getConstraintViolations());
+    }
+  }
+
+  public static class AvroReaderApplication implements StreamingApplication
+  {
+
+    AvroFileInputOperator avroFileInputOperator;
+
+    public AvroFileInputOperator getAvroFileInput()
+    {
+      return avroFileInputOperator;
+    }
+
+    public void setAvroFileInputOperator(AvroFileInputOperator avroFileInputOperator)
+    {
+      this.avroFileInputOperator = avroFileInputOperator;
+    }
+
+    @Override
+    public void populateDAG(DAG dag, Configuration conf)
+    {
+      AvroFileInputOperator avroInputOperator = dag.addOperator("avroInputOperator", getAvroFileInput());
+      ConsoleOutputOperator consoleOutput = dag.addOperator("GenericRecordOp", new ConsoleOutputOperator());
+      dag.addStream("pojo", avroInputOperator.output, consoleOutput.input).setLocality(Locality.CONTAINER_LOCAL);
+    }
+
+  }
+
+  public static class AvroToPojoApplication implements StreamingApplication
+  {
+
+    AvroFileInputOperator avroFileInputOperator;
+    AvroToPojo avroToPojo;
+
+    public AvroFileInputOperator getAvroFileInput()
+    {
+      return avroFileInputOperator;
+    }
+
+    public void setAvroFileInputOperator(AvroFileInputOperator avroFileInputOperator)
+    {
+      this.avroFileInputOperator = avroFileInputOperator;
+    }
+
+    public void setAvroToPojo(AvroToPojo avroToPojo)
+    {
+      this.avroToPojo = avroToPojo;
+    }
+
+    public AvroToPojo getAvroToPojo()
+    {
+      return avroToPojo;
+    }
+
+    @Override
+    public void populateDAG(DAG dag, Configuration conf)
+    {
+      AvroFileInputOperator avroInputOperator = dag.addOperator("avroInputOperator", getAvroFileInput());
+      AvroToPojo avroToPojo = dag.addOperator("AvroToPojo", getAvroToPojo());
+      ConsoleOutputOperator consoleOutput = dag.addOperator("GenericRecordOp", new ConsoleOutputOperator());
+      dag.getMeta(avroToPojo).getMeta(avroToPojo.output).getAttributes().put(Context.PortContext.TUPLE_CLASS,
+          SimpleOrder.class);
+
+      dag.addStream("GenericRecords", avroInputOperator.output, avroToPojo.data).setLocality(Locality.THREAD_LOCAL);
+      dag.addStream("POJO", avroToPojo.output, consoleOutput.input).setLocality(Locality.CONTAINER_LOCAL);
+    }
+
+  }
+
+  public static class SimpleOrder
+  {
+
+    private Integer customerId;
+    private Long orderId;
+    private Double total;
+    private String customerName;
+
+    public SimpleOrder()
+    {
+    }
+
+    public SimpleOrder(int customerId, long orderId, double total, String customerName)
+    {
+      setCustomerId(customerId);
+      setOrderId(orderId);
+      setTotal(total);
+      setCustomerName(customerName);
+    }
+
+    public String getCustomerName()
+    {
+      return customerName;
+    }
+
+    public void setCustomerName(String customerName)
+    {
+      this.customerName = customerName;
+    }
+
+    public Integer getCustomerId()
+    {
+      return customerId;
+    }
+
+    public void setCustomerId(Integer customerId)
+    {
+      this.customerId = customerId;
+    }
+
+    public Long getOrderId()
+    {
+      return orderId;
+    }
+
+    public void setOrderId(Long orderId)
+    {
+      this.orderId = orderId;
+    }
+
+    public Double getTotal()
+    {
+      return total;
+    }
+
+    public void setTotal(Double total)
+    {
+      this.total = total;
+    }
+
+    @Override
+    public String toString()
+    {
+      return "SimpleOrder [customerId=" + customerId + ", orderId=" + orderId + ", total=" + total + ", customerName="
+          + customerName + "]";
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5075ce0e/contrib/src/test/java/com/datatorrent/contrib/avro/AvroToPojoTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/avro/AvroToPojoTest.java b/contrib/src/test/java/com/datatorrent/contrib/avro/AvroToPojoTest.java
new file mode 100644
index 0000000..23714a3
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/avro/AvroToPojoTest.java
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.avro;
+
+import java.util.List;
+import java.util.ListIterator;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.python.google.common.collect.Lists;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.helper.TestPortContext;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+public class AvroToPojoTest
+{
+  public static final String fieldInfoInitMap = "orderId:orderId:LONG," + "customerId:customerId:INTEGER,"
+      + "customerName:customerName:STRING," + "total:total:DOUBLE";
+
+  public static final String byteFieldInfoInitMap = "orderId:orderId:LONG," + "customerId:customerId:INTEGER,"
+      + "customerName:customerName:BYTES," + "total:total:DOUBLE";
+
+  private static final String AVRO_SCHEMA = "{\"namespace\":\"abc\"," + ""
+      + "\"type\":\"record\",\"doc\":\"Order schema\"," + "\"name\":\"Order\",\"fields\":[{\"name\":\"orderId\","
+      + "\"type\": \"long\"}," + "{\"name\":\"customerId\",\"type\": \"int\"},"
+      + "{\"name\":\"total\",\"type\": \"double\"}," + "{\"name\":\"customerName\",\"type\": \"string\"}]}";
+
+  private static final String AVRO_SCHEMA_FOR_BYTES = "{\"namespace\":\"abc\"," + ""
+      + "\"type\":\"record\",\"doc\":\"Order schema\"," + "\"name\":\"Order\",\"fields\":[{\"name\":\"orderId\","
+      + "\"type\": \"long\"}," + "{\"name\":\"customerId\",\"type\": \"int\"},"
+      + "{\"name\":\"total\",\"type\": \"double\"}," + "{\"name\":\"customerName\",\"type\": \"bytes\"}]}";
+
+  CollectorTestSink<Object> outputSink = new CollectorTestSink<Object>();
+  AvroToPojo avroReader = new AvroToPojo();
+
+  private List<GenericRecord> recordList = null;
+
+  public class TestMeta extends TestWatcher
+  {
+    Context.OperatorContext context;
+    Context.PortContext portContext;
+
+    @Override
+    protected void starting(org.junit.runner.Description description)
+    {
+      Attribute.AttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+      portAttributes.put(Context.PortContext.TUPLE_CLASS, SimpleOrder.class);
+      portContext = new TestPortContext(portAttributes);
+      super.starting(description);
+      avroReader.output.setSink(outputSink);
+      createReaderInput();
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      avroReader.teardown();
+    }
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Test
+  public void testAvroReads() throws Exception
+  {
+
+    avroReader.setPojoClass(SimpleOrder.class);
+    avroReader.setGenericRecordToPOJOFieldsMapping(fieldInfoInitMap);
+    avroReader.output.setup(testMeta.portContext);
+    avroReader.setup(testMeta.context);
+
+    avroReader.beginWindow(0);
+
+    ListIterator<GenericRecord> itr = recordList.listIterator();
+
+    while (itr.hasNext()) {
+      avroReader.data.process(itr.next());
+    }
+
+    avroReader.endWindow();
+    Assert.assertEquals("Number of tuples", 3, outputSink.collectedTuples.size());
+    avroReader.teardown();
+
+  }
+
+  @Test
+  public void testAvroReadsInvalidDataType() throws Exception
+  {
+
+    avroReader.setPojoClass(SimpleOrder.class);
+    avroReader.setGenericRecordToPOJOFieldsMapping(byteFieldInfoInitMap);
+    avroReader.output.setup(testMeta.portContext);
+    avroReader.setup(testMeta.context);
+
+    avroReader.beginWindow(0);
+
+    ListIterator<GenericRecord> itr = recordList.listIterator();
+
+    while (itr.hasNext()) {
+      avroReader.data.process(itr.next());
+    }
+
+    avroReader.endWindow();
+    Assert.assertEquals("Number of tuples", 3, outputSink.collectedTuples.size());
+    avroReader.teardown();
+
+  }
+
+  @Test
+  public void testAvroReadsWithReflection() throws Exception
+  {
+
+    avroReader.setPojoClass(SimpleOrder.class);
+    avroReader.output.setup(testMeta.portContext);
+    avroReader.setup(testMeta.context);
+
+    avroReader.beginWindow(0);
+
+    ListIterator<GenericRecord> itr = recordList.listIterator();
+
+    while (itr.hasNext()) {
+      avroReader.data.process(itr.next());
+    }
+
+    avroReader.endWindow();
+    Assert.assertEquals("Number of tuples", 3, outputSink.collectedTuples.size());
+    avroReader.teardown();
+
+  }
+
+  @Test
+  public void testReadFailures() throws Exception
+  {
+
+    avroReader.setPojoClass(SimpleOrder.class);
+    avroReader.setGenericRecordToPOJOFieldsMapping(fieldInfoInitMap);
+    avroReader.output.setup(testMeta.portContext);
+    avroReader.setup(testMeta.context);
+
+    avroReader.beginWindow(0);
+
+    ListIterator<GenericRecord> itr = recordList.listIterator();
+
+    while (itr.hasNext()) {
+      GenericRecord rec = itr.next();
+      rec.put("orderId", "abc");
+      avroReader.data.process(rec);
+    }
+
+    Assert.assertEquals("Number of tuples", 3, avroReader.errorCount);
+    avroReader.endWindow();
+    avroReader.teardown();
+
+  }
+
+  @Test
+  public void testReadFieldFailures() throws Exception
+  {
+
+    int cnt = 3;
+
+    avroReader.setPojoClass(SimpleOrder.class);
+    avroReader.setGenericRecordToPOJOFieldsMapping(fieldInfoInitMap);
+    avroReader.output.setup(testMeta.portContext);
+    avroReader.setup(testMeta.context);
+
+    avroReader.beginWindow(0);
+
+    for (int i = 0; i < cnt; i++) {
+      avroReader.data.process(null);
+    }
+
+    Assert.assertEquals("Number of tuples", 12, avroReader.fieldErrorCount);
+
+    avroReader.endWindow();
+    avroReader.teardown();
+
+  }
+
+  private void createReaderInput()
+  {
+    int cnt = 3;
+
+    recordList = Lists.newArrayList();
+
+    while (cnt > 0) {
+      GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(AVRO_SCHEMA));
+      rec.put("orderId", cnt * 1L);
+      rec.put("customerId", cnt * 2);
+      rec.put("total", cnt * 1.5);
+      rec.put("customerName", "*" + cnt + "*");
+      cnt--;
+      recordList.add(rec);
+    }
+  }
+
+  public static class SimpleOrder
+  {
+
+    private Integer customerId;
+    private Long orderId;
+    private Double total;
+    private String customerName;
+
+    public SimpleOrder()
+    {
+    }
+
+    public SimpleOrder(int customerId, long orderId, double total, String customerName)
+    {
+      setCustomerId(customerId);
+      setOrderId(orderId);
+      setTotal(total);
+      setCustomerName(customerName);
+    }
+
+    public String getCustomerName()
+    {
+      return customerName;
+    }
+
+    public void setCustomerName(String customerName)
+    {
+      this.customerName = customerName;
+    }
+
+    public Integer getCustomerId()
+    {
+      return customerId;
+    }
+
+    public void setCustomerId(Integer customerId)
+    {
+      this.customerId = customerId;
+    }
+
+    public Long getOrderId()
+    {
+      return orderId;
+    }
+
+    public void setOrderId(Long orderId)
+    {
+      this.orderId = orderId;
+    }
+
+    public Double getTotal()
+    {
+      return total;
+    }
+
+    public void setTotal(Double total)
+    {
+      this.total = total;
+    }
+
+    @Override
+    public String toString()
+    {
+      return "SimpleOrder [customerId=" + customerId + ", orderId=" + orderId + ", total=" + total + ", customerName="
+          + customerName + "]";
+    }
+
+  }
+
+  public static class Order
+  {
+
+    private int orderId;
+
+    public Order()
+    {
+
+    }
+
+    public Order(int orderId)
+    {
+      this.orderId = orderId;
+    }
+
+    public int getOrderId()
+    {
+      return orderId;
+    }
+
+    public void setOrderId(int orderId)
+    {
+      this.orderId = orderId;
+    }
+
+    @Override
+    public String toString()
+    {
+      return "Order [orderId=" + orderId + "]";
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5075ce0e/contrib/src/test/java/com/datatorrent/contrib/avro/PojoToAvroTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/avro/PojoToAvroTest.java b/contrib/src/test/java/com/datatorrent/contrib/avro/PojoToAvroTest.java
new file mode 100644
index 0000000..772a057
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/avro/PojoToAvroTest.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.avro;
+
+import java.util.List;
+import java.util.ListIterator;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.python.google.common.collect.Lists;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.helper.TestPortContext;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+public class PojoToAvroTest
+{
+
+  private static final String AVRO_SCHEMA = "{\"namespace\":\"abc\"," + ""
+      + "\"type\":\"record\",\"doc\":\"Order schema\"," + "\"name\":\"Order\",\"fields\":[{\"name\":\"orderId\","
+      + "\"type\": \"long\"}," + "{\"name\":\"customerId\",\"type\": \"int\"},"
+      + "{\"name\":\"total\",\"type\": \"double\"}," + "{\"name\":\"customerName\",\"type\": \"string\"}]}";
+
+  CollectorTestSink<Object> outputSink = new CollectorTestSink<Object>();
+  PojoToAvro avroWriter = new PojoToAvro();
+
+  public class TestMeta extends TestWatcher
+  {
+    public String dir = null;
+    Context.OperatorContext context;
+    Context.PortContext portContext;
+
+    @Override
+    protected void starting(org.junit.runner.Description description)
+    {
+      Attribute.AttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+      portAttributes.put(Context.PortContext.TUPLE_CLASS, SimpleOrder.class);
+      portContext = new TestPortContext(portAttributes);
+      super.starting(description);
+      avroWriter.output.setSink(outputSink);
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      avroWriter.teardown();
+    }
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Test
+  public void testWriting() throws Exception
+  {
+
+    List<SimpleOrder> orderList = Lists.newArrayList();
+    orderList.add(new SimpleOrder(1, 11, 100.25, "customerOne"));
+    orderList.add(new SimpleOrder(2, 22, 200.25, "customerTwo"));
+    orderList.add(new SimpleOrder(3, 33, 300.25, "customerThree"));
+
+    avroWriter.setSchemaString(AVRO_SCHEMA);
+    avroWriter.data.setup(testMeta.portContext);
+    avroWriter.setup(testMeta.context);
+
+    avroWriter.beginWindow(0);
+
+    ListIterator<SimpleOrder> itr = orderList.listIterator();
+
+    while (itr.hasNext()) {
+      avroWriter.data.process(itr.next());
+    }
+
+    avroWriter.endWindow();
+    Assert.assertEquals("Number of tuples", 3, outputSink.collectedTuples.size());
+    avroWriter.teardown();
+
+  }
+
+  @Test
+  public void testWriteFailure() throws Exception
+  {
+
+    List<Order> orderList = Lists.newArrayList();
+    orderList.add(new Order(11));
+    orderList.add(new Order(22));
+    orderList.add(new Order(33));
+
+    avroWriter.setSchemaString(AVRO_SCHEMA);
+
+    avroWriter.setup(testMeta.context);
+    avroWriter.data.setup(testMeta.portContext);
+
+    avroWriter.beginWindow(0);
+
+    ListIterator<Order> itr = orderList.listIterator();
+
+    while (itr.hasNext()) {
+      avroWriter.data.process(itr.next());
+    }
+
+    Assert.assertEquals("Field write failures", 12, avroWriter.fieldErrorCount);
+
+    Assert.assertEquals("Record write failures", 3, avroWriter.errorCount);
+
+    avroWriter.endWindow();
+
+    Assert.assertEquals("Number of tuples", 0, outputSink.collectedTuples.size());
+
+    avroWriter.teardown();
+
+  }
+
+  public static class SimpleOrder
+  {
+
+    private Integer customerId;
+    private Long orderId;
+    private Double total;
+    private String customerName;
+
+    public SimpleOrder()
+    {
+    }
+
+    public SimpleOrder(int customerId, long orderId, double total, String customerName)
+    {
+      setCustomerId(customerId);
+      setOrderId(orderId);
+      setTotal(total);
+      setCustomerName(customerName);
+    }
+
+    public String getCustomerName()
+    {
+      return customerName;
+    }
+
+    public void setCustomerName(String customerName)
+    {
+      this.customerName = customerName;
+    }
+
+    public Integer getCustomerId()
+    {
+      return customerId;
+    }
+
+    public void setCustomerId(Integer customerId)
+    {
+      this.customerId = customerId;
+    }
+
+    public Long getOrderId()
+    {
+      return orderId;
+    }
+
+    public void setOrderId(Long orderId)
+    {
+      this.orderId = orderId;
+    }
+
+    public Double getTotal()
+    {
+      return total;
+    }
+
+    public void setTotal(Double total)
+    {
+      this.total = total;
+    }
+
+    @Override
+    public String toString()
+    {
+      return "SimpleOrder [customerId=" + customerId + ", orderId=" + orderId + ", total=" + total + ", customerName="
+          + customerName + "]";
+    }
+
+  }
+
+  public static class Order
+  {
+
+    private int orderId;
+
+    public Order()
+    {
+
+    }
+
+    public Order(int orderId)
+    {
+      this.orderId = orderId;
+    }
+
+    public int getOrderId()
+    {
+      return orderId;
+    }
+
+    public void setOrderId(int orderId)
+    {
+      this.orderId = orderId;
+    }
+
+    @Override
+    public String toString()
+    {
+      return "Order [orderId=" + orderId + "]";
+    }
+
+  }
+
+}


Mime
View raw message