apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (APEXMALHAR-2011) POJO to Avro record converter
Date Wed, 16 Mar 2016 08:51:33 GMT

    [ https://issues.apache.org/jira/browse/APEXMALHAR-2011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15197043#comment-15197043
] 

ASF GitHub Bot commented on APEXMALHAR-2011:
--------------------------------------------

Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/211#discussion_r56299066
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/avro/PojoToAvro.java ---
    @@ -0,0 +1,296 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package com.datatorrent.contrib.avro;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.io.ObjectOutputStream;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.avro.AvroRuntimeException;
    +import org.apache.avro.Schema;
    +import org.apache.avro.Schema.Field;
    +import org.apache.avro.generic.GenericData;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.commons.lang3.ClassUtils;
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.Context.OperatorContext;
    +import com.datatorrent.api.Context.PortContext;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.api.annotation.InputPortFieldAnnotation;
    +import com.datatorrent.lib.util.PojoUtils;
    +import com.datatorrent.lib.util.PojoUtils.Getter;
    +
    +/**
    + * <p>
    + * PojoToAvro
    + * </p>
    + * A generic implementation for POJO to Avro conversion. A POJO is converted to
    + * a GenericRecord based on the schema provided. As of now only primitive types
    + * are supported.
    + *
    + * @displayName Pojo To Avro
    + * @category Converter
    + * @tags avro
    + * @since 3.3.0
    + */
    +
    +@InterfaceStability.Evolving
    +public class PojoToAvro implements Operator
    +{
    +
    +  private List<Field> columnNames;
    +
    +  private Class<?> cls;
    +
    +  private List<Getter> keyMethodMap;
    +
    +  private String schemaFile;
    +
    +  private Schema schema;
    +
    +  @AutoMetric
    +  int recordCnt = 0;
    +
    +  @AutoMetric
    +  int errorCnt = 0;
    +
    +  @AutoMetric
    +  int fieldErrorCnt = 0;
    +
    +  public final transient DefaultOutputPort<GenericRecord> output = new DefaultOutputPort<GenericRecord>();
    +
    +  @Override
    +  public void setup(OperatorContext context)
    +  {
    +    try {
    +      parseSchema();
    +    } catch (IOException e) {
    +      LOG.error("Exception in parsing schema");
    +    }
    +    initializeColumnMap(getSchema());
    +  }
    +
    +  public void parseSchema() throws IOException
    +  {
    +    setSchema(new Schema.Parser().parse(getSchemaString()));
    +  }
    +
    +  public String getSchemaString()
    +  {
    +    return schemaFile;
    +  }
    +
    +  public void setSchemaString(String schemaFile)
    +  {
    +    this.schemaFile = schemaFile;
    +  }
    +
    +  public Schema getSchema()
    +  {
    +    return schema;
    +  }
    +
    +  public void setSchema(Schema schema)
    +  {
    +    this.schema = schema;
    +  }
    +
    +  public List<Field> getColumnNames()
    +  {
    +    return columnNames;
    +  }
    +
    +  public void setColumnNames(List<Field> columnNames)
    +  {
    +    this.columnNames = columnNames;
    +  }
    +
    +  public Class<?> getCls()
    +  {
    +    return cls;
    +  }
    +
    +  public void setCls(Class<?> cls)
    +  {
    +    this.cls = cls;
    +  }
    +
    +  public List<Getter> getKeyMethodMap()
    +  {
    +    return keyMethodMap;
    +  }
    +
    +  public void setKeyMethodMap(List<Getter> keyMethodMap)
    +  {
    +    this.keyMethodMap = keyMethodMap;
    +  }
    +
    +  /**
    +   * Adding this as a plug for being able to serialize non primitive types
    +   * 
    +   * @param -
    +   *          object to serialize Returns a byte array
    +   */
    +
    +  public byte[] serialize(Object obj) throws IOException
    +  {
    +    try (ByteArrayOutputStream b = new ByteArrayOutputStream()) {
    +      try (ObjectOutputStream o = new ObjectOutputStream(b)) {
    +        o.writeObject(obj);
    +      }
    +      return b.toByteArray();
    +    }
    +  }
    +
    +  /**
    +   * @param -
    +   *          className
    +   * @param -
    +   *          name of the field to create the getter for Returns a getter
    +   */
    +
    +  private Getter<?, ?> generateGettersForField(Class<?> cls, String inputFieldName)
    +      throws NoSuchFieldException, SecurityException
    +  {
    +    java.lang.reflect.Field f = cls.getDeclaredField(inputFieldName);
    +    Class<?> c = ClassUtils.primitiveToWrapper(f.getType());
    +
    +    Getter<?, ?> classGetter = PojoUtils.createGetter(cls, inputFieldName, c);
    +
    +    return classGetter;
    +  }
    +
    +  /**
    +   * @param -
    +   *          schema of the generic record Assumption is that the name of a
    +   *          field in POJO is the same as the name in Avro schema
    +   */
    +
    +  public void initializeColumnMap(Schema schema)
    +  {
    +    columnNames = schema.getFields();
    +
    +    keyMethodMap = new ArrayList<Getter>();
    +    for (int i = 0; i < columnNames.size(); i++) {
    +      try {
    +        keyMethodMap.add(generateGettersForField(cls, columnNames.get(i).name()));
    +      } catch (NoSuchFieldException | SecurityException e) {
    +        throw new RuntimeException("Failed to initialize pojo class getters for field:
", e);
    +      }
    +    }
    +  }
    +
    +  @InputPortFieldAnnotation(optional = true, schemaRequired = true)
    +  public final transient DefaultInputPort<Object> data = new DefaultInputPort<Object>()
    +  {
    +
    +    @Override
    +    public void setup(PortContext context)
    +    {
    +      cls = context.getValue(Context.PortContext.TUPLE_CLASS);
    +    }
    +
    +    @Override
    +    public void process(Object tuple)
    +    {
    +
    +      GenericRecord record = null;
    +
    +      try {
    +        record = getGenericRecord(tuple);
    +      } catch (Exception e) {
    +        LOG.error("Exception in parsing record");
    +        errorCnt++;
    +      }
    +
    +      if (record != null) {
    +        output.emit(record);
    +        recordCnt++;
    +      } else {
    +        errorCnt++;
    +      }
    +
    +    }
    +
    +  };
    +
    +  /**
    +   * @param -Object
    +   *          Returns a generic record mapping the POJO fields to provided
    +   *          schema
    +   */
    +
    +  public GenericRecord getGenericRecord(Object tuple) throws Exception
    --- End diff --
    
    Why public?


> POJO to Avro record converter
> -----------------------------
>
>                 Key: APEXMALHAR-2011
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2011
>             Project: Apache Apex Malhar
>          Issue Type: New Feature
>            Reporter: devendra tagare
>
> We are looking to develop a record converter which would take a POJO as an input and
emit a Generic record as the output based on the given Avro schema.
> The expected inputs for this operator would be,
> 1.Class Name of the incoming POJO
> 2.Avro schema for the Generic Record to emit.
> This operator would receive an Object on its input port and emit a Generic record on
the output port.
> To start with, we would handle primitive types and then go on to handle complex types.
> Thanks,
> Dev



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message