apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (APEXMALHAR-2011) POJO to Avro record converter
Date Wed, 30 Mar 2016 06:29:25 GMT

    [ https://issues.apache.org/jira/browse/APEXMALHAR-2011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15217492#comment-15217492
] 

ASF GitHub Bot commented on APEXMALHAR-2011:
--------------------------------------------

Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/211#discussion_r57841961
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java
---
    @@ -0,0 +1,164 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.avro;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.avro.AvroRuntimeException;
    +import org.apache.avro.file.DataFileStream;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.avro.io.DatumReader;
    +import org.apache.hadoop.classification.InterfaceStability;
    +import org.apache.hadoop.fs.Path;
    +
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
    +
    +/**
    + * <p>
    + * Avro File Input Operator
    + * </p>
    + * A specific implementation of the AbstractFileInputOperator to read Avro
    + * container files.<br>
    + * No need to provide schema,its inferred from the file<br>
    + * Users can add the {@link IdempotentStorageManager.FSIdempotentStorageManager}
    + * to ensure exactly once semantics with a HDFS backed WAL.
    + * 
    + * @displayName AvroFileInputOperator
    + * @category Input
    + * @tags fs, file,avro, input operator
    + */
    +@InterfaceStability.Evolving
    +public class AvroFileInputOperator extends AbstractFileInputOperator<GenericRecord>
    +{
    +
    +  private transient long offset = 0L;
    +
    +  @AutoMetric
    +  int recordCnt = 0;
    +
    +  @AutoMetric
    +  int errorCnt = 0;
    +
    +  private transient DataFileStream<GenericRecord> avroDataStream;
    +  private transient GenericRecord record = null;
    +
    +  public final transient DefaultOutputPort<GenericRecord> output = new DefaultOutputPort<GenericRecord>();
    +
    +  public final transient DefaultOutputPort<String> completedFilesPort = new DefaultOutputPort<String>();
    +
    +  public final transient DefaultOutputPort<String> errorRecordsPort = new DefaultOutputPort<String>();
    +
    +  /**
    +   * Returns a input stream given a file path
    +   * 
    +   * @param path
    +   * @return InputStream
    +   * @throws IOException
    +   */
    +  @Override
    +  protected InputStream openFile(Path path) throws IOException
    +  {
    +    InputStream is = super.openFile(path);
    +    if (is != null) {
    +      try {
    +        DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
    +        avroDataStream = new DataFileStream<GenericRecord>(is, datumReader);
    +        datumReader.setSchema(avroDataStream.getSchema());
    +      } catch (NullPointerException npe) {
    +        LOG.error("Schemaless file", npe);
    +        throw new NullPointerException();
    +      }
    +    }
    +    return is;
    +  }
    +
    +  @Override
    +  protected GenericRecord readEntity() throws IOException
    +  {
    +    return readRecord();
    +  }
    +
    +  /**
    +   * Reads a GenericRecord from the given input stream<br>
    +   * Emits the FileName,Offset,Exception on the error port if its connected
    +   * 
    +   * @return GenericRecord
    +   */
    +  private GenericRecord readRecord() throws IOException
    +  {
    +    record = null;
    +
    +    try {
    +      if (avroDataStream != null && avroDataStream.hasNext()) {
    +        offset++;
    +
    +        record = avroDataStream.next();
    +        recordCnt++;
    +        return record;
    +      }
    +    } catch (AvroRuntimeException are) {
    +      LOG.error("Exception in parsing record for file - " + super.currentFile + " at
offset - " + offset, are);
    +      if (errorRecordsPort.isConnected()) {
    +        errorRecordsPort.emit("FileName:" + super.currentFile + ", Offset:" + offset);
    +      }
    +      errorCnt++;
    +      throw new AvroRuntimeException(are);
    +    }
    +    return record;
    +  }
    +
    +  @Override
    +  protected void closeFile(InputStream is) throws IOException
    +  {
    +    String fileName = super.currentFile;
    +    super.closeFile(is);
    --- End diff --
    
    Shouldn't suport.closeFile be done later and avroDataStream.close() first? This is because
avroStream is created from super.inputStream.


> POJO to Avro record converter
> -----------------------------
>
>                 Key: APEXMALHAR-2011
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2011
>             Project: Apache Apex Malhar
>          Issue Type: New Feature
>            Reporter: devendra tagare
>
> We are looking to develop a record converter which would take a POJO as an input and
emit a Generic record as the output based on the given Avro schema.
> The expected inputs for this operator would be,
> 1.Class Name of the incoming POJO
> 2.Avro schema for the Generic Record to emit.
> This operator would receive an Object on its input port and emit a Generic record on
the output port.
> To start with, we would handle primitive types and then go on to handle complex types.
> Thanks,
> Dev



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message