Return-Path: X-Original-To: apmail-apex-dev-archive@minotaur.apache.org Delivered-To: apmail-apex-dev-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CFEBD181AF for ; Wed, 16 Mar 2016 08:45:37 +0000 (UTC) Received: (qmail 90090 invoked by uid 500); 16 Mar 2016 08:45:37 -0000 Delivered-To: apmail-apex-dev-archive@apex.apache.org Received: (qmail 90029 invoked by uid 500); 16 Mar 2016 08:45:37 -0000 Mailing-List: contact dev-help@apex.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.incubator.apache.org Delivered-To: mailing list dev@apex.incubator.apache.org Received: (qmail 89927 invoked by uid 99); 16 Mar 2016 08:45:37 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Mar 2016 08:45:37 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 3B7A51804A1 for ; Wed, 16 Mar 2016 08:45:37 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.349 X-Spam-Level: X-Spam-Status: No, score=-4.349 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.329] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id AHQc8aNNVkpg for ; Wed, 16 Mar 2016 08:45:35 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 668275FB3A for ; Wed, 16 Mar 2016 08:45:34 +0000 (UTC) Received: (qmail 89166 invoked by uid 99); 16 Mar 2016 08:45:34 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Mar 2016 08:45:33 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id DA6242C1F69 for ; Wed, 16 Mar 2016 08:45:33 +0000 (UTC) Date: Wed, 16 Mar 2016 08:45:33 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: dev@apex.incubator.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (APEXMALHAR-2011) POJO to Avro record converter MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/APEXMALHAR-2011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15197032#comment-15197032 ] ASF GitHub Bot commented on APEXMALHAR-2011: -------------------------------------------- Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/incubator-apex-malhar/pull/211#discussion_r56298529 --- Diff: contrib/src/main/java/com/datatorrent/contrib/avro/AvroToPojo.java --- @@ -0,0 +1,428 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.datatorrent.contrib.avro; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.StringTokenizer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.avro.AvroRuntimeException; +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.lang3.ClassUtils; +import org.apache.hadoop.classification.InterfaceStability; + +import com.google.common.collect.Lists; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.Context.PortContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.util.FieldInfo; +import com.datatorrent.lib.util.FieldInfo.SupportType; +import com.datatorrent.lib.util.PojoUtils; + +/** + *

+ * AvroToPojo + *

+ * A generic implementation for conversion from Avro to POJO. The POJO class + * name & field mapping should be provided by the user. If this mapping is not + * provided then reflection is used to determine this mapping. As of now only + * primitive types are supported. + * + * @displayName Avro To Pojo + * @category Converter + * @tags avro + * @since 3.3.0 + */ + +@InterfaceStability.Evolving +public class AvroToPojo implements Operator +{ + + private List columnNames; + + private Class cls; + + private static final String FIELD_SEPARATOR = ":"; + private static final String RECORD_SEPARATOR = ","; + + private String genericRecordToPOJOFieldsMapping = null; + + public String getGenericRecordToPOJOFieldsMapping() + { + return genericRecordToPOJOFieldsMapping; + } + + public void setGenericRecordToPOJOFieldsMapping(String genericRecordToPOJOFieldsMapping) + { + this.genericRecordToPOJOFieldsMapping = genericRecordToPOJOFieldsMapping; + } + + private String schemaFile; + + private Schema schema; + + private List fieldInfos; + + private transient List columnFieldSetters = null; + + @AutoMetric + int recordCnt = 0; + + @AutoMetric + int errorCnt = 0; + + @AutoMetric + int fieldErrorCnt = 0; + + public void parseSchema() throws IOException + { + setSchema(new Schema.Parser().parse(getSchemaString())); + } + + public String getSchemaString() + { + return schemaFile; + } + + public void setSchemaString(String schemaFile) + { + this.schemaFile = schemaFile; + } + + public Schema getSchema() + { + return schema; + } + + public void setSchema(Schema schema) + { + this.schema = schema; + } + + public byte[] serialize(Object obj) throws IOException + { + try (ByteArrayOutputStream b = new ByteArrayOutputStream()) { + try (ObjectOutputStream o = new ObjectOutputStream(b)) { + o.writeObject(obj); + } + return b.toByteArray(); + } + } + + @InputPortFieldAnnotation(optional = false) + public final transient DefaultInputPort data = new DefaultInputPort() + { + + @Override + public void process(GenericRecord tuple) + { + + try { + Object obj = getPOJOFromGenericRecord(tuple, getCls()); + + if (obj != null) { + output.emit(obj); + recordCnt++; + } + + } catch (InstantiationException | IllegalAccessException e) { + LOG.error("Could not initialize object of class - " + getClass().getName()); + errorCnt++; + } + } + + }; + + @SuppressWarnings("unchecked") + public Object getPOJOFromGenericRecord(GenericRecord tuple, Class cls) + throws InstantiationException, IllegalAccessException + { + Object newObj = getCls().newInstance(); + + try { + + for (int i = 0; i < columnFieldSetters.size(); i++) { + + AvroToPojo.ActiveFieldInfo afi = columnFieldSetters.get(i); + + afi = columnFieldSetters.get(i); + + SupportType st = afi.fieldInfo.getType(); + + Object val = null; + + try { + val = tuple.get(afi.fieldInfo.getColumnName()); + } catch (Exception e) { + LOG.error("Could not find field -" + afi.fieldInfo.getColumnName() + "- in the generic record"); + val = null; + fieldErrorCnt++; + } + + if (val == null) { + continue; + } + + try { + + switch (st) { + + case BOOLEAN: + + afi.setterOrGetter = PojoUtils.createSetterBoolean(getCls(), afi.fieldInfo.getPojoFieldExpression()); + ((PojoUtils.SetterBoolean)afi.setterOrGetter).set(newObj, + (boolean)tuple.get(afi.fieldInfo.getColumnName())); + break; + + case DOUBLE: + afi.setterOrGetter = PojoUtils.createSetterDouble(getCls(), afi.fieldInfo.getPojoFieldExpression()); + ((PojoUtils.SetterDouble)afi.setterOrGetter).set(newObj, + (double)tuple.get(afi.fieldInfo.getColumnName())); + break; + + case FLOAT: + afi.setterOrGetter = PojoUtils.createSetterFloat(getCls(), afi.fieldInfo.getPojoFieldExpression()); + ((PojoUtils.SetterFloat)afi.setterOrGetter).set(newObj, + (float)tuple.get(afi.fieldInfo.getColumnName())); + break; + + case INTEGER: + afi.setterOrGetter = PojoUtils.createSetterInt(getCls(), afi.fieldInfo.getPojoFieldExpression()); + ((PojoUtils.SetterInt)afi.setterOrGetter).set(newObj, + (int)tuple.get(afi.fieldInfo.getColumnName())); + break; + + case STRING: + afi.setterOrGetter = PojoUtils.createSetter(getCls(), afi.fieldInfo.getPojoFieldExpression(), + afi.fieldInfo.getType().getJavaType()); + ((PojoUtils.Setter)afi.setterOrGetter).set(newObj, + new String(tuple.get(afi.fieldInfo.getColumnName()).toString())); + break; + + case LONG: + afi.setterOrGetter = PojoUtils.createSetterLong(getCls(), afi.fieldInfo.getPojoFieldExpression()); + ((PojoUtils.SetterLong)afi.setterOrGetter).set(newObj, + (long)tuple.get(afi.fieldInfo.getColumnName())); + break; + + default: + afi.setterOrGetter = PojoUtils.createSetter(getCls(), afi.fieldInfo.getPojoFieldExpression(), Byte.class); + ((PojoUtils.Setter)afi.setterOrGetter).set(newObj, + serialize(tuple.get(afi.fieldInfo.getColumnName()))); + break; + + } + + } catch (AvroRuntimeException | IOException e) { + LOG.error("Exception in setting value" + e.getMessage()); + fieldErrorCnt++; + } + + } + } catch (Exception ex) { + LOG.error("Generic Exception in setting value" + ex.getMessage()); + errorCnt++; + } + + return newObj; + } + + /** + * Use reflection to generate field info values if the user has not provided + * the inputs mapping + */ + + public String generateFieldInfoInputs(Class cls) + { + + java.lang.reflect.Field[] fields = cls.getDeclaredFields(); + + StringBuilder sb = new StringBuilder(); + + for (int i = 0; i < fields.length; i++) { + + java.lang.reflect.Field f = fields[i]; + Class c = ClassUtils.primitiveToWrapper(f.getType()); + sb.append(f.getName() + FIELD_SEPARATOR + f.getName() + FIELD_SEPARATOR + c.getSimpleName().toUpperCase() + + RECORD_SEPARATOR); + + } + + return sb.substring(0, sb.length() - 1).toString(); + + } + + public List createFieldInfoMap(String str) + { + fieldInfos = new ArrayList(); + + StringTokenizer strtok = new StringTokenizer(str, RECORD_SEPARATOR); + + while (strtok.hasMoreTokens()) { + String[] token = strtok.nextToken().split(FIELD_SEPARATOR); + + fieldInfos.add(new FieldInfo(token[0], token[1], SupportType.valueOf(token[2]))); + } + + return fieldInfos; + } + + @OutputPortFieldAnnotation(schemaRequired = true) + public final transient DefaultOutputPort output = new DefaultOutputPort() + { + public void setup(PortContext context) + { + cls = context.getValue(Context.PortContext.TUPLE_CLASS); + } + }; + + @Override + public void setup(OperatorContext context) + { + + columnFieldSetters = Lists.newArrayList(); + + try { + parseSchema(); + } catch (IOException e) { + LOG.error("Exception in parsing schema"); + } + + if (getFieldInfos().isEmpty() || getGenericRecordToPOJOFieldsMapping() == null) { + setFieldInfos(createFieldInfoMap(generateFieldInfoInputs(getCls()))); + } else { + setFieldInfos(createFieldInfoMap(getGenericRecordToPOJOFieldsMapping())); + } + + initColumnFieldSetters(getFieldInfos()); + + } + + @Override + public void teardown() + { + // TODO Auto-generated method stub + + } + + @Override + public void beginWindow(long windowId) + { + // TODO Auto-generated method stub + + } + + @Override + public void endWindow() + { + errorCnt = 0; + fieldErrorCnt = 0; + recordCnt = 0; + + } + + public List getColumnNames() + { + return columnNames; + } + + public void setColumnNames(List columnNames) + { + this.columnNames = columnNames; + } + + public Class getCls() --- End diff -- Why is this externall required variables? > POJO to Avro record converter > ----------------------------- > > Key: APEXMALHAR-2011 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2011 > Project: Apache Apex Malhar > Issue Type: New Feature > Reporter: devendra tagare > > We are looking to develop a record converter which would take a POJO as an input and emit a Generic record as the output based on the given Avro schema. > The expected inputs for this operator would be, > 1.Class Name of the incoming POJO > 2.Avro schema for the Generic Record to emit. > This operator would receive an Object on its input port and emit a Generic record on the output port. > To start with, we would handle primitive types and then go on to handle complex types. > Thanks, > Dev -- This message was sent by Atlassian JIRA (v6.3.4#6332)