apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From devtagare <...@git.apache.org>
Subject [GitHub] incubator-apex-malhar pull request: APEXMALHAR-2011-2012 Avro to P...
Date Wed, 30 Mar 2016 18:04:29 GMT
Github user devtagare commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/211#discussion_r57934935
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java
---
    @@ -0,0 +1,164 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.avro;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.avro.AvroRuntimeException;
    +import org.apache.avro.file.DataFileStream;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.avro.io.DatumReader;
    +import org.apache.hadoop.classification.InterfaceStability;
    +import org.apache.hadoop.fs.Path;
    +
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
    +
    +/**
    + * <p>
    + * Avro File Input Operator
    + * </p>
    + * A specific implementation of the AbstractFileInputOperator to read Avro
    + * container files.<br>
    + * No need to provide schema,its inferred from the file<br>
    + * Users can add the {@link IdempotentStorageManager.FSIdempotentStorageManager}
    + * to ensure exactly once semantics with a HDFS backed WAL.
    + * 
    + * @displayName AvroFileInputOperator
    + * @category Input
    + * @tags fs, file,avro, input operator
    + */
    +@InterfaceStability.Evolving
    +public class AvroFileInputOperator extends AbstractFileInputOperator<GenericRecord>
    +{
    +
    +  private transient long offset = 0L;
    +
    +  @AutoMetric
    +  int recordCnt = 0;
    +
    +  @AutoMetric
    +  int errorCnt = 0;
    +
    +  private transient DataFileStream<GenericRecord> avroDataStream;
    +  private transient GenericRecord record = null;
    +
    +  public final transient DefaultOutputPort<GenericRecord> output = new DefaultOutputPort<GenericRecord>();
    +
    +  public final transient DefaultOutputPort<String> completedFilesPort = new DefaultOutputPort<String>();
    +
    +  public final transient DefaultOutputPort<String> errorRecordsPort = new DefaultOutputPort<String>();
    +
    +  /**
    +   * Returns a input stream given a file path
    +   * 
    +   * @param path
    +   * @return InputStream
    +   * @throws IOException
    +   */
    +  @Override
    +  protected InputStream openFile(Path path) throws IOException
    +  {
    +    InputStream is = super.openFile(path);
    +    if (is != null) {
    +      try {
    +        DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
    +        avroDataStream = new DataFileStream<GenericRecord>(is, datumReader);
    +        datumReader.setSchema(avroDataStream.getSchema());
    +      } catch (NullPointerException npe) {
    +        LOG.error("Schemaless file", npe);
    +        throw new NullPointerException();
    +      }
    +    }
    +    return is;
    +  }
    +
    +  @Override
    +  protected GenericRecord readEntity() throws IOException
    +  {
    +    return readRecord();
    +  }
    +
    +  /**
    +   * Reads a GenericRecord from the given input stream<br>
    +   * Emits the FileName,Offset,Exception on the error port if its connected
    +   * 
    +   * @return GenericRecord
    +   */
    +  private GenericRecord readRecord() throws IOException
    +  {
    +    record = null;
    +
    +    try {
    +      if (avroDataStream != null && avroDataStream.hasNext()) {
    +        offset++;
    +
    +        record = avroDataStream.next();
    +        recordCnt++;
    +        return record;
    +      }
    +    } catch (AvroRuntimeException are) {
    +      LOG.error("Exception in parsing record for file - " + super.currentFile + " at
offset - " + offset, are);
    +      if (errorRecordsPort.isConnected()) {
    +        errorRecordsPort.emit("FileName:" + super.currentFile + ", Offset:" + offset);
    +      }
    +      errorCnt++;
    +      throw new AvroRuntimeException(are);
    +    }
    +    return record;
    +  }
    +
    +  @Override
    +  protected void closeFile(InputStream is) throws IOException
    +  {
    +    String fileName = super.currentFile;
    +    super.closeFile(is);
    +    if (avroDataStream != null) {
    +      avroDataStream.close();
    +    }
    +    if (completedFilesPort.isConnected()) {
    +      completedFilesPort.emit(fileName);
    +    }
    +    offset = 0;
    +  }
    +
    +  @Override
    +  protected void emit(GenericRecord tuple)
    +  {
    +    if (tuple != null) {
    +      output.emit(tuple);
    +    }
    +  }
    +
    +  @Override
    +  public void endWindow()
    +  {
    +    errorCnt = 0;
    --- End diff --
    
    Noticed that autometric variables are not marked private in several operators.see CsvParser,
JsonParser. They will not be accessible in the test class unless getters are provided.
    Has the usage changed since they were first implemented ?
    beginWindow seems a better place for resetting the values, moving it there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message