apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (APEXMALHAR-2011) POJO to Avro record converter
Date Wed, 30 Mar 2016 07:14:25 GMT

    [ https://issues.apache.org/jira/browse/APEXMALHAR-2011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15217558#comment-15217558
] 

ASF GitHub Bot commented on APEXMALHAR-2011:
--------------------------------------------

Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/211#discussion_r57845253
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/avro/AvroToPojo.java ---
    @@ -0,0 +1,415 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.avro;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.io.ObjectOutputStream;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.StringTokenizer;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.avro.AvroRuntimeException;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.commons.lang3.ClassUtils;
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +import com.google.common.collect.Lists;
    +
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.Context.PortContext;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.annotation.InputPortFieldAnnotation;
    +import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
    +import com.datatorrent.common.util.BaseOperator;
    +import com.datatorrent.lib.util.FieldInfo;
    +import com.datatorrent.lib.util.FieldInfo.SupportType;
    +import com.datatorrent.lib.util.PojoUtils;
    +
    +/**
    + * <p>
    + * AvroToPojo
    + * </p>
    + * A generic implementation for conversion from Avro to POJO. The POJO class
    + * name & field mapping should be provided by the user. If this mapping is not
    + * provided then reflection is used to determine this mapping. As of now only
    + * primitive types are supported.
    + *
    + * @displayName Avro To Pojo
    + * @category Converter
    + * @tags avro
    + * @since 3.3.0
    + */
    +@InterfaceStability.Evolving
    +public class AvroToPojo extends BaseOperator
    +{
    +
    +  private Class<?> cls;
    +
    +  private static final String FIELD_SEPARATOR = ":";
    +  private static final String RECORD_SEPARATOR = ",";
    +
    +  private String genericRecordToPOJOFieldsMapping = null;
    +
    +  private List<FieldInfo> fieldInfos;
    +
    +  private List<ActiveFieldInfo> columnFieldSetters = null;
    +
    +  @AutoMetric
    +  int recordCnt = 0;
    +
    +  @AutoMetric
    +  int errorCnt = 0;
    +
    +  @AutoMetric
    +  int fieldErrorCnt = 0;
    +
    +  /**
    +   * Retruns a string representing mapping from generic record to POJO fields
    +   */
    +  public String getGenericRecordToPOJOFieldsMapping()
    +  {
    +    return genericRecordToPOJOFieldsMapping;
    +  }
    +
    +  /**
    +   * Comma separated list mapping a field in Avro schema to POJO field eg :
    +   * orderId:orderId:INTEGER
    +   */
    +  public void setGenericRecordToPOJOFieldsMapping(String genericRecordToPOJOFieldsMapping)
    +  {
    +    this.genericRecordToPOJOFieldsMapping = genericRecordToPOJOFieldsMapping;
    +  }
    +
    +  @InputPortFieldAnnotation(optional = false)
    +  public final transient DefaultInputPort<GenericRecord> data = new DefaultInputPort<GenericRecord>()
    +  {
    +    @Override
    +    public void process(GenericRecord tuple)
    +    {
    +      processTuple(tuple);
    +    }
    +  };
    +
    +  /**
    +   * Converts given Generic Record and to a POJO and emits it
    +   */
    +  protected void processTuple(GenericRecord tuple)
    +  {
    +    try {
    +      Object obj = getPOJOFromGenericRecord(tuple, getCls());
    +
    +      if (obj != null) {
    +        output.emit(obj);
    +        recordCnt++;
    +      }
    +
    +    } catch (InstantiationException | IllegalAccessException e) {
    +      LOG.error("Could not initialize object of class - " + getClass().getName());
    +      errorCnt++;
    +    }
    +  }
    +
    +  /**
    +   * Returns a POJO from a Generic Record
    +   * 
    +   * @return Object
    +   */
    +  @SuppressWarnings("unchecked")
    +  private Object getPOJOFromGenericRecord(GenericRecord tuple, Class<?> cls)
    +      throws InstantiationException, IllegalAccessException
    +  {
    +    Object newObj = getCls().newInstance();
    +
    +    try {
    +      for (int i = 0; i < columnFieldSetters.size(); i++) {
    +
    +        AvroToPojo.ActiveFieldInfo afi = columnFieldSetters.get(i);
    +        afi = columnFieldSetters.get(i);
    +        SupportType st = afi.fieldInfo.getType();
    +        Object val = null;
    +
    +        try {
    +          val = tuple.get(afi.fieldInfo.getColumnName());
    +        } catch (Exception e) {
    +          LOG.error("Could not find field -" + afi.fieldInfo.getColumnName() + "- in
the generic record");
    +          val = null;
    +          fieldErrorCnt++;
    +        }
    +
    +        if (val == null) {
    +          continue;
    +        }
    +
    +        try {
    +          switch (st) {
    +            case BOOLEAN:
    +              ((PojoUtils.SetterBoolean<Object>)afi.setterOrGetter).set(newObj,
    +                  (boolean)tuple.get(afi.fieldInfo.getColumnName()));
    +              break;
    +
    +            case DOUBLE:
    +              ((PojoUtils.SetterDouble<Object>)afi.setterOrGetter).set(newObj,
    +                  (double)tuple.get(afi.fieldInfo.getColumnName()));
    +              break;
    +
    +            case FLOAT:
    +              ((PojoUtils.SetterFloat<Object>)afi.setterOrGetter).set(newObj,
    +                  (float)tuple.get(afi.fieldInfo.getColumnName()));
    +              break;
    +
    +            case INTEGER:
    +              ((PojoUtils.SetterInt<Object>)afi.setterOrGetter).set(newObj,
    +                  (int)tuple.get(afi.fieldInfo.getColumnName()));
    +              break;
    +
    +            case STRING:
    +              ((PojoUtils.Setter<Object, String>)afi.setterOrGetter).set(newObj,
    +                  new String(tuple.get(afi.fieldInfo.getColumnName()).toString()));
    +              break;
    +
    +            case LONG:
    +              ((PojoUtils.SetterLong<Object>)afi.setterOrGetter).set(newObj,
    +                  (long)tuple.get(afi.fieldInfo.getColumnName()));
    +              break;
    +
    +            default:
    +              ((PojoUtils.Setter<Object, byte[]>)afi.setterOrGetter).set(newObj,
    +                  serialize(tuple.get(afi.fieldInfo.getColumnName())));
    +              break;
    +          }
    +        } catch (AvroRuntimeException | IOException e) {
    +          LOG.error("Exception in setting value" + e.getMessage());
    +          fieldErrorCnt++;
    +        }
    +
    +      }
    +    } catch (Exception ex) {
    +      LOG.error("Generic Exception in setting value" + ex.getMessage());
    +      errorCnt++;
    +      newObj = null;
    +    }
    +    return newObj;
    +  }
    +
    +  /**
    +   * Use reflection to generate field info values if the user has not provided
    +   * the inputs mapping
    +   * 
    +   * @return String representing the POJO field to Avro field mapping
    +   */
    +  public String generateFieldInfoInputs(Class<?> cls)
    +  {
    +    java.lang.reflect.Field[] fields = cls.getDeclaredFields();
    +    StringBuilder sb = new StringBuilder();
    +
    +    for (int i = 0; i < fields.length; i++) {
    +      java.lang.reflect.Field f = fields[i];
    +      Class<?> c = ClassUtils.primitiveToWrapper(f.getType());
    +      sb.append(f.getName() + FIELD_SEPARATOR + f.getName() + FIELD_SEPARATOR + c.getSimpleName().toUpperCase()
    +          + RECORD_SEPARATOR);
    +    }
    +    return sb.substring(0, sb.length() - 1);
    +  }
    +
    +  /**
    +   * Creates a map representing fieldName in POJO:field in Generic Record:Data
    +   * type
    +   * 
    +   * @return List of FieldInfo
    +   */
    +  public List<FieldInfo> createFieldInfoMap(String str)
    --- End diff --
    
    Can this be made private?


> POJO to Avro record converter
> -----------------------------
>
>                 Key: APEXMALHAR-2011
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2011
>             Project: Apache Apex Malhar
>          Issue Type: New Feature
>            Reporter: devendra tagare
>
> We are looking to develop a record converter which would take a POJO as an input and
emit a Generic record as the output based on the given Avro schema.
> The expected inputs for this operator would be,
> 1.Class Name of the incoming POJO
> 2.Avro schema for the Generic Record to emit.
> This operator would receive an Object on its input port and emit a Generic record on
the output port.
> To start with, we would handle primitive types and then go on to handle complex types.
> Thanks,
> Dev



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message