Return-Path: X-Original-To: apmail-apex-dev-archive@minotaur.apache.org Delivered-To: apmail-apex-dev-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BFA8E181C8 for ; Wed, 16 Mar 2016 08:48:36 +0000 (UTC) Received: (qmail 96760 invoked by uid 500); 16 Mar 2016 08:48:36 -0000 Delivered-To: apmail-apex-dev-archive@apex.apache.org Received: (qmail 96691 invoked by uid 500); 16 Mar 2016 08:48:36 -0000 Mailing-List: contact dev-help@apex.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.incubator.apache.org Delivered-To: mailing list dev@apex.incubator.apache.org Received: (qmail 96605 invoked by uid 99); 16 Mar 2016 08:48:36 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Mar 2016 08:48:36 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 34AD518052A for ; Wed, 16 Mar 2016 08:48:36 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.349 X-Spam-Level: X-Spam-Status: No, score=-4.349 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.329] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id 2Ll22NwwtEEh for ; Wed, 16 Mar 2016 08:48:34 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 3BB4A5FB48 for ; Wed, 16 Mar 2016 08:48:34 +0000 (UTC) Received: (qmail 96231 invoked by uid 99); 16 Mar 2016 08:48:33 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Mar 2016 08:48:33 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id 7FA122C1F56 for ; Wed, 16 Mar 2016 08:48:33 +0000 (UTC) Date: Wed, 16 Mar 2016 08:48:33 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: dev@apex.incubator.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (APEXMALHAR-2011) POJO to Avro record converter MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/APEXMALHAR-2011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15197038#comment-15197038 ] ASF GitHub Bot commented on APEXMALHAR-2011: -------------------------------------------- Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/incubator-apex-malhar/pull/211#discussion_r56298721 --- Diff: contrib/src/main/java/com/datatorrent/contrib/avro/PojoToAvro.java --- @@ -0,0 +1,296 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.datatorrent.contrib.avro; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.avro.AvroRuntimeException; +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.lang3.ClassUtils; +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.Context.PortContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.lib.util.PojoUtils; +import com.datatorrent.lib.util.PojoUtils.Getter; + +/** + *

+ * PojoToAvro + *

+ * A generic implementation for POJO to Avro conversion. A POJO is converted to + * a GenericRecord based on the schema provided. As of now only primitive types + * are supported. + * + * @displayName Pojo To Avro + * @category Converter + * @tags avro + * @since 3.3.0 + */ + +@InterfaceStability.Evolving +public class PojoToAvro implements Operator +{ + + private List columnNames; + + private Class cls; + + private List keyMethodMap; + + private String schemaFile; + + private Schema schema; + + @AutoMetric + int recordCnt = 0; + + @AutoMetric + int errorCnt = 0; + + @AutoMetric + int fieldErrorCnt = 0; + + public final transient DefaultOutputPort output = new DefaultOutputPort(); + + @Override + public void setup(OperatorContext context) + { + try { + parseSchema(); + } catch (IOException e) { + LOG.error("Exception in parsing schema"); + } + initializeColumnMap(getSchema()); + } + + public void parseSchema() throws IOException + { + setSchema(new Schema.Parser().parse(getSchemaString())); + } + + public String getSchemaString() + { + return schemaFile; + } + + public void setSchemaString(String schemaFile) + { + this.schemaFile = schemaFile; + } + + public Schema getSchema() + { + return schema; + } + + public void setSchema(Schema schema) + { + this.schema = schema; + } + + public List getColumnNames() + { + return columnNames; + } + + public void setColumnNames(List columnNames) + { + this.columnNames = columnNames; + } + + public Class getCls() + { + return cls; + } + + public void setCls(Class cls) + { + this.cls = cls; + } + + public List getKeyMethodMap() + { + return keyMethodMap; + } + + public void setKeyMethodMap(List keyMethodMap) + { + this.keyMethodMap = keyMethodMap; + } + + /** + * Adding this as a plug for being able to serialize non primitive types + * + * @param - + * object to serialize Returns a byte array + */ + + public byte[] serialize(Object obj) throws IOException + { + try (ByteArrayOutputStream b = new ByteArrayOutputStream()) { + try (ObjectOutputStream o = new ObjectOutputStream(b)) { + o.writeObject(obj); + } + return b.toByteArray(); + } + } + + /** + * @param - + * className + * @param - + * name of the field to create the getter for Returns a getter + */ + + private Getter generateGettersForField(Class cls, String inputFieldName) + throws NoSuchFieldException, SecurityException + { + java.lang.reflect.Field f = cls.getDeclaredField(inputFieldName); + Class c = ClassUtils.primitiveToWrapper(f.getType()); + + Getter classGetter = PojoUtils.createGetter(cls, inputFieldName, c); + + return classGetter; + } + + /** + * @param - + * schema of the generic record Assumption is that the name of a + * field in POJO is the same as the name in Avro schema + */ + + public void initializeColumnMap(Schema schema) --- End diff -- Why public? > POJO to Avro record converter > ----------------------------- > > Key: APEXMALHAR-2011 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2011 > Project: Apache Apex Malhar > Issue Type: New Feature > Reporter: devendra tagare > > We are looking to develop a record converter which would take a POJO as an input and emit a Generic record as the output based on the given Avro schema. > The expected inputs for this operator would be, > 1.Class Name of the incoming POJO > 2.Avro schema for the Generic Record to emit. > This operator would receive an Object on its input port and emit a Generic record on the output port. > To start with, we would handle primitive types and then go on to handle complex types. > Thanks, > Dev -- This message was sent by Atlassian JIRA (v6.3.4#6332)