apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chinmaykolhatkar <...@git.apache.org>
Subject [GitHub] incubator-apex-malhar pull request: APEXMALHAR-2011-2012 Avro to P...
Date Wed, 16 Mar 2016 08:51:03 GMT
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/211#discussion_r56299041
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/avro/PojoToAvro.java ---
    @@ -0,0 +1,296 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package com.datatorrent.contrib.avro;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.io.ObjectOutputStream;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.avro.AvroRuntimeException;
    +import org.apache.avro.Schema;
    +import org.apache.avro.Schema.Field;
    +import org.apache.avro.generic.GenericData;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.commons.lang3.ClassUtils;
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.Context.OperatorContext;
    +import com.datatorrent.api.Context.PortContext;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.api.annotation.InputPortFieldAnnotation;
    +import com.datatorrent.lib.util.PojoUtils;
    +import com.datatorrent.lib.util.PojoUtils.Getter;
    +
    +/**
    + * <p>
    + * PojoToAvro
    + * </p>
    + * A generic implementation for POJO to Avro conversion. A POJO is converted to
    + * a GenericRecord based on the schema provided. As of now only primitive types
    + * are supported.
    + *
    + * @displayName Pojo To Avro
    + * @category Converter
    + * @tags avro
    + * @since 3.3.0
    + */
    +
    +@InterfaceStability.Evolving
    +public class PojoToAvro implements Operator
    +{
    +
    +  private List<Field> columnNames;
    +
    +  private Class<?> cls;
    +
    +  private List<Getter> keyMethodMap;
    +
    +  private String schemaFile;
    +
    +  private Schema schema;
    +
    +  @AutoMetric
    +  int recordCnt = 0;
    +
    +  @AutoMetric
    +  int errorCnt = 0;
    +
    +  @AutoMetric
    +  int fieldErrorCnt = 0;
    +
    +  public final transient DefaultOutputPort<GenericRecord> output = new DefaultOutputPort<GenericRecord>();
    +
    +  @Override
    +  public void setup(OperatorContext context)
    +  {
    +    try {
    +      parseSchema();
    +    } catch (IOException e) {
    +      LOG.error("Exception in parsing schema");
    +    }
    +    initializeColumnMap(getSchema());
    +  }
    +
    +  public void parseSchema() throws IOException
    +  {
    +    setSchema(new Schema.Parser().parse(getSchemaString()));
    +  }
    +
    +  public String getSchemaString()
    +  {
    +    return schemaFile;
    +  }
    +
    +  public void setSchemaString(String schemaFile)
    +  {
    +    this.schemaFile = schemaFile;
    +  }
    +
    +  public Schema getSchema()
    +  {
    +    return schema;
    +  }
    +
    +  public void setSchema(Schema schema)
    +  {
    +    this.schema = schema;
    +  }
    +
    +  public List<Field> getColumnNames()
    +  {
    +    return columnNames;
    +  }
    +
    +  public void setColumnNames(List<Field> columnNames)
    +  {
    +    this.columnNames = columnNames;
    +  }
    +
    +  public Class<?> getCls()
    +  {
    +    return cls;
    +  }
    +
    +  public void setCls(Class<?> cls)
    +  {
    +    this.cls = cls;
    +  }
    +
    +  public List<Getter> getKeyMethodMap()
    +  {
    +    return keyMethodMap;
    +  }
    +
    +  public void setKeyMethodMap(List<Getter> keyMethodMap)
    +  {
    +    this.keyMethodMap = keyMethodMap;
    +  }
    +
    +  /**
    +   * Adding this as a plug for being able to serialize non primitive types
    +   * 
    +   * @param -
    +   *          object to serialize Returns a byte array
    +   */
    +
    +  public byte[] serialize(Object obj) throws IOException
    +  {
    +    try (ByteArrayOutputStream b = new ByteArrayOutputStream()) {
    +      try (ObjectOutputStream o = new ObjectOutputStream(b)) {
    +        o.writeObject(obj);
    +      }
    +      return b.toByteArray();
    +    }
    +  }
    +
    +  /**
    +   * @param -
    +   *          className
    +   * @param -
    +   *          name of the field to create the getter for Returns a getter
    +   */
    +
    +  private Getter<?, ?> generateGettersForField(Class<?> cls, String inputFieldName)
    +      throws NoSuchFieldException, SecurityException
    +  {
    +    java.lang.reflect.Field f = cls.getDeclaredField(inputFieldName);
    +    Class<?> c = ClassUtils.primitiveToWrapper(f.getType());
    +
    +    Getter<?, ?> classGetter = PojoUtils.createGetter(cls, inputFieldName, c);
    +
    +    return classGetter;
    +  }
    +
    +  /**
    +   * @param -
    +   *          schema of the generic record Assumption is that the name of a
    +   *          field in POJO is the same as the name in Avro schema
    +   */
    +
    +  public void initializeColumnMap(Schema schema)
    +  {
    +    columnNames = schema.getFields();
    +
    +    keyMethodMap = new ArrayList<Getter>();
    +    for (int i = 0; i < columnNames.size(); i++) {
    +      try {
    +        keyMethodMap.add(generateGettersForField(cls, columnNames.get(i).name()));
    +      } catch (NoSuchFieldException | SecurityException e) {
    +        throw new RuntimeException("Failed to initialize pojo class getters for field:
", e);
    +      }
    +    }
    +  }
    +
    +  @InputPortFieldAnnotation(optional = true, schemaRequired = true)
    +  public final transient DefaultInputPort<Object> data = new DefaultInputPort<Object>()
    +  {
    +
    +    @Override
    +    public void setup(PortContext context)
    +    {
    +      cls = context.getValue(Context.PortContext.TUPLE_CLASS);
    +    }
    +
    +    @Override
    +    public void process(Object tuple)
    +    {
    +
    +      GenericRecord record = null;
    +
    +      try {
    +        record = getGenericRecord(tuple);
    +      } catch (Exception e) {
    +        LOG.error("Exception in parsing record");
    +        errorCnt++;
    +      }
    +
    +      if (record != null) {
    +        output.emit(record);
    +        recordCnt++;
    +      } else {
    +        errorCnt++;
    +      }
    +
    +    }
    +
    +  };
    +
    +  /**
    +   * @param -Object
    --- End diff --
    
    Please have a complete javadoc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message