flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Porritt, James" <James.Porr...@uk.mlp.com>
Subject RE: AvroInputFormat NullPointerException issues
Date Tue, 17 Jul 2018 15:13:50 GMT
I got to the bottom of this – it was a namespace issue. My schema was;
{
  "type" : "record",
  "name" : "MyAvroSchema",
  "fields" : [ {
    "name" : "a",
    "type" : [ "null", "int" ]
  }, {
    "name" : "b",
    "type" : [ "null", "string" ]
  }]
}
But actually, I was putting the generated MyAvroSchema file into ‘my_stats’ namespace
(along with my other application code) by adding a ‘package my_stats;’ line at the top.
When I added “namespace”: “my_stats” to the schema and generated the Java that way
it was fine.

From: Porritt, James <James.Porritt@uk.mlp.com>
Sent: 17 July 2018 15:10
To: 'vino yang' <yanghua1127@gmail.com>
Cc: user@flink.apache.org
Subject: RE: AvroInputFormat NullPointerException issues

My MyAvroSchema class is as follows. It was generated using avro-tools:

/**
* Autogenerated by Avro
*
* DO NOT EDIT DIRECTLY
*/

import org.apache.avro.specific.SpecificData;
import org.apache.avro.message.BinaryMessageEncoder;
import org.apache.avro.message.BinaryMessageDecoder;
import org.apache.avro.message.SchemaStore;

@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class MyAvroSchema extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord
{
  private static final long serialVersionUID = 4994916517880671663L;
  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"MyAvroSchema\",\"fields\":[{\"name\":\"a\",\"type\":[\"null\",\"int\"]},{\"name\":\"b\",\"type\":[\"null\",\"string\"]}]}");
  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }

  private static SpecificData MODEL$ = new SpecificData();

  private static final BinaryMessageEncoder<MyAvroSchema> ENCODER =
      new BinaryMessageEncoder<MyAvroSchema>(MODEL$, SCHEMA$);

  private static final BinaryMessageDecoder<MyAvroSchema> DECODER =
      new BinaryMessageDecoder<MyAvroSchema>(MODEL$, SCHEMA$);

  /**
   * Return the BinaryMessageDecoder instance used by this class.
   */
  public static BinaryMessageDecoder<MyAvroSchema> getDecoder() {
    return DECODER;
  }

  /**
   * Create a new BinaryMessageDecoder instance for this class that uses the specified {@link
SchemaStore}.
   * @param resolver a {@link SchemaStore} used to find schemas by fingerprint
   */
  public static BinaryMessageDecoder<MyAvroSchema> createDecoder(SchemaStore resolver)
{
    return new BinaryMessageDecoder<MyAvroSchema>(MODEL$, SCHEMA$, resolver);
  }

  /** Serializes this MyAvroSchema to a ByteBuffer. */
  public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException {
    return ENCODER.encode(this);
  }

  /** Deserializes a MyAvroSchema from a ByteBuffer. */
  public static MyAvroSchema fromByteBuffer(
      java.nio.ByteBuffer b) throws java.io.IOException {
    return DECODER.decode(b);
  }

  @Deprecated public java.lang.Integer a;
  @Deprecated public java.lang.CharSequence b;

  /**
   * Default constructor.  Note that this does not initialize fields
   * to their default values from the schema.  If that is desired then
   * one should use <code>newBuilder()</code>.
   */
  public MyAvroSchema() {}

  /**
   * All-args constructor.
   * @param a The new value for a
   * @param b The new value for b
   */
  public MyAvroSchema(java.lang.Integer a, java.lang.CharSequence b) {
    this.a = a;
    this.b = b;
  }

  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
  // Used by DatumWriter.  Applications should not call.
  public java.lang.Object get(int field$) {
    switch (field$) {
    case 0: return a;
    case 1: return b;
    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
    }
  }

  // Used by DatumReader.  Applications should not call.
  @SuppressWarnings(value="unchecked")
  public void put(int field$, java.lang.Object value$) {
    switch (field$) {
    case 0: a = (java.lang.Integer)value$; break;
    case 1: b = (java.lang.CharSequence)value$; break;
    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
    }
  }

  /**
   * Gets the value of the 'a' field.
   * @return The value of the 'a' field.
   */
  public java.lang.Integer getA() {
    return a;
  }

  /**
   * Sets the value of the 'a' field.
   * @param value the value to set.
   */
  public void setA(java.lang.Integer value) {
    this.a = value;
  }

  /**
   * Gets the value of the 'b' field.
   * @return The value of the 'b' field.
   */
  public java.lang.CharSequence getB() {
    return b;
  }

  /**
   * Sets the value of the 'b' field.
   * @param value the value to set.
   */
  public void setB(java.lang.CharSequence value) {
    this.b = value;
  }

  /**
   * Creates a new MyAvroSchema RecordBuilder.
   * @return A new MyAvroSchema RecordBuilder
   */
  public static MyAvroSchema.Builder newBuilder() {
    return new MyAvroSchema.Builder();
  }

  /**
   * Creates a new MyAvroSchema RecordBuilder by copying an existing Builder.
   * @param other The existing builder to copy.
   * @return A new MyAvroSchema RecordBuilder
   */
  public static MyAvroSchema.Builder newBuilder(MyAvroSchema.Builder other) {
    return new MyAvroSchema.Builder(other);
  }

  /**
   * Creates a new MyAvroSchema RecordBuilder by copying an existing MyAvroSchema instance.
   * @param other The existing instance to copy.
   * @return A new MyAvroSchema RecordBuilder
   */
  public static MyAvroSchema.Builder newBuilder(MyAvroSchema other) {
    return new MyAvroSchema.Builder(other);
  }

  /**
   * RecordBuilder for MyAvroSchema instances.
   */
  public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<MyAvroSchema>
    implements org.apache.avro.data.RecordBuilder<MyAvroSchema> {

    private java.lang.Integer a;
    private java.lang.CharSequence b;

    /** Creates a new Builder */
    private Builder() {
      super(SCHEMA$);
    }

    /**
     * Creates a Builder by copying an existing Builder.
     * @param other The existing Builder to copy.
     */
    private Builder(MyAvroSchema.Builder other) {
      super(other);
      if (isValidValue(fields()[0], other.a)) {
        this.a = data().deepCopy(fields()[0].schema(), other.a);
        fieldSetFlags()[0] = true;
      }
      if (isValidValue(fields()[1], other.b)) {
        this.b = data().deepCopy(fields()[1].schema(), other.b);
        fieldSetFlags()[1] = true;
      }
    }

    /**
     * Creates a Builder by copying an existing MyAvroSchema instance
     * @param other The existing instance to copy.
     */
    private Builder(MyAvroSchema other) {
            super(SCHEMA$);
      if (isValidValue(fields()[0], other.a)) {
        this.a = data().deepCopy(fields()[0].schema(), other.a);
        fieldSetFlags()[0] = true;
      }
      if (isValidValue(fields()[1], other.b)) {
        this.b = data().deepCopy(fields()[1].schema(), other.b);
        fieldSetFlags()[1] = true;
      }
    }

    /**
      * Gets the value of the 'a' field.
      * @return The value.
      */
    public java.lang.Integer getA() {
      return a;
    }

    /**
      * Sets the value of the 'a' field.
      * @param value The value of 'a'.
      * @return This builder.
      */
    public MyAvroSchema.Builder setA(java.lang.Integer value) {
      validate(fields()[0], value);
      this.a = value;
      fieldSetFlags()[0] = true;
      return this;
    }

    /**
      * Checks whether the 'a' field has been set.
      * @return True if the 'a' field has been set, false otherwise.
      */
    public boolean hasA() {
      return fieldSetFlags()[0];
    }


    /**
      * Clears the value of the 'a' field.
      * @return This builder.
      */
    public MyAvroSchema.Builder clearA() {
      a = null;
      fieldSetFlags()[0] = false;
      return this;
    }

    /**
      * Gets the value of the 'b' field.
      * @return The value.
      */
    public java.lang.CharSequence getB() {
      return b;
    }

    /**
      * Sets the value of the 'b' field.
      * @param value The value of 'b'.
      * @return This builder.
      */
    public MyAvroSchema.Builder setB(java.lang.CharSequence value) {
      validate(fields()[1], value);
      this.b = value;
      fieldSetFlags()[1] = true;
      return this;
    }

    /**
      * Checks whether the 'b' field has been set.
      * @return True if the 'b' field has been set, false otherwise.
      */
    public boolean hasB() {
      return fieldSetFlags()[1];
    }


    /**
      * Clears the value of the 'b' field.
      * @return This builder.
      */
    public MyAvroSchema.Builder clearB() {
      b = null;
      fieldSetFlags()[1] = false;
      return this;
    }

    @Override
    @SuppressWarnings("unchecked")
    public MyAvroSchema build() {
      try {
        MyAvroSchema record = new MyAvroSchema();
        record.a = fieldSetFlags()[0] ? this.a : (java.lang.Integer) defaultValue(fields()[0]);
        record.b = fieldSetFlags()[1] ? this.b : (java.lang.CharSequence) defaultValue(fields()[1]);
        return record;
      } catch (java.lang.Exception e) {
        throw new org.apache.avro.AvroRuntimeException(e);
      }
    }
  }

  @SuppressWarnings("unchecked")
  private static final org.apache.avro.io.DatumWriter<MyAvroSchema>
    WRITER$ = (org.apache.avro.io.DatumWriter<MyAvroSchema>)MODEL$.createDatumWriter(SCHEMA$);

  @Override public void writeExternal(java.io.ObjectOutput out)
    throws java.io.IOException {
    WRITER$.write(this, SpecificData.getEncoder(out));
  }

  @SuppressWarnings("unchecked")
  private static final org.apache.avro.io.DatumReader<MyAvroSchema>
    READER$ = (org.apache.avro.io.DatumReader<MyAvroSchema>)MODEL$.createDatumReader(SCHEMA$);

 @Override public void readExternal(java.io.ObjectInput in)
    throws java.io.IOException {
    READER$.read(this, SpecificData.getDecoder(in));
  }

}

I will check out the other suggestions you make. One concern I have is that from the stacktrace
I posted it doesn’t actually look like the custom class is being called.

From: vino yang <yanghua1127@gmail.com<mailto:yanghua1127@gmail.com>>
Sent: 17 July 2018 05:49
To: Porritt, James <James.Porritt@uk.mlp.com<mailto:James.Porritt@uk.mlp.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: AvroInputFormat NullPointerException issues

Hi Porritt,

Based on the exception stack trace you provided, it seems the exception occurs when initializing
Avro schema. You did not give the definition of the MyAvroSchema Class, so I'd to suggest
you :

1. make sure the file path "file:///home/myuser/test.avro" exists in your tm node which run
your source task.
2. here is the flink-avro connector documentation[1] you can refer
3. there are many test case such as this[2], you can use to test your program, just need a
little change

[1]: https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/batch/connectors.html#avro-support-in-flink
[2]: https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroInputFormatTypeExtractionTest.java

---
thanks.
vino.

2018-07-16 20:22 GMT+08:00 Porritt, James <James.Porritt@uk.mlp.com<mailto:James.Porritt@uk.mlp.com>>:
I’ve been trying to use the following code:

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        Path path = new Path("file:///home/myuser/test.avro");
        AvroInputFormat<MyAvroSchema> my_format = new AvroInputFormat<>(path,
MyAvroSchema.class);
        DataSet<MyAvroSchema> my_input = env.createInput(my_format);
        my_input.print();
        env.execute();

to utilise this avro schema:

{
  "type" : "record",
  "name" : "MyAvroSchema",
  "fields" : [ {
    "name" : "a",
    "type" : [ "null", "int" ]
  }, {
    "name" : "b",
    "type" : [ "null", "string" ]
  }]
}

I created the MyAvroSchema class from this schema using avro tools. I also converted the following
JSON into a compatible avro stored in file:///home/myuser/test.avro

{"a":{"int":123},"b":{"string":"hello"}}

When I try and run this however I get:

2018-07-16 12:59:26,761 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
  - DataSource (at createInput(ExecutionEnvironment.java:548) (org.apache.flink.formats.avro.AvroInputFormat)
) (1/1) (302878b522f420f6b7866de4f32fcbd6) switched from RUNNING to FAILED.
org.apache.avro.AvroRuntimeException: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException:
java.lang.NullPointerException
        at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:227)
        at org.apache.avro.specific.SpecificDatumReader.<init>(SpecificDatumReader.java:37)
        at org.apache.flink.formats.avro.AvroInputFormat.initReader(AvroInputFormat.java:122)
        at org.apache.flink.formats.avro.AvroInputFormat.open(AvroInputFormat.java:111)
        at org.apache.flink.formats.avro.AvroInputFormat.open(AvroInputFormat.java:54)
        at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:170)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
        at java.lang.Thread.run(Thread.java:748)
Caused by: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.NullPointerException
        at avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2234)
        at avro.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3965)
        at avro.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3969)
        at avro.shaded.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4829)
        at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:225)
        ... 7 more
Caused by: java.lang.NullPointerException
        at java.lang.String.replace(String.java:2239)
        at org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:281)
        at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
        at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
        at avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
        at avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
        at avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
        at avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
        ... 11 more

Can anyone suggest what might be causing the NullPointerException? I’m using flink-1.5.0
and avro-tools-1.8.2

######################################################################
The information contained in this communication is confidential and
intended only for the individual(s) named above. If you are not a named
addressee, please notify the sender immediately and delete this email
from your system and do not disclose the email or any part of it to any
person. The views expressed in this email are the views of the author
and do not necessarily represent the views of Millennium Capital Partners
LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic
communications of MCP LLP and its affiliates, including telephone
communications, may be electronically archived and subject to review
and/or disclosure to someone other than the recipient. MCP LLP is
authorized and regulated by the Financial Conduct Authority. Millennium
Capital Partners LLP is a limited liability partnership registered in
England & Wales with number OC312897 and with its registered office at
50 Berkeley Street, London, W1J 8HD<https://maps.google.com/?q=50+Berkeley+Street,+London,+W1J+8HD&entry=gmail&source=g>.
######################################################################

######################################################################
The information contained in this communication is confidential and
intended only for the individual(s) named above. If you are not a named
addressee, please notify the sender immediately and delete this email
from your system and do not disclose the email or any part of it to any
person. The views expressed in this email are the views of the author
and do not necessarily represent the views of Millennium Capital Partners
LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic
communications of MCP LLP and its affiliates, including telephone
communications, may be electronically archived and subject to review
and/or disclosure to someone other than the recipient. MCP LLP is
authorized and regulated by the Financial Conduct Authority. Millennium
Capital Partners LLP is a limited liability partnership registered in
England & Wales with number OC312897 and with its registered office at
50 Berkeley Street, London, W1J 8HD.
######################################################################

######################################################################

The information contained in this communication is confidential and

intended only for the individual(s) named above. If you are not a named

addressee, please notify the sender immediately and delete this email

from your system and do not disclose the email or any part of it to any

person. The views expressed in this email are the views of the author

and do not necessarily represent the views of Millennium Capital Partners

LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic

communications of MCP LLP and its affiliates, including telephone

communications, may be electronically archived and subject to review

and/or disclosure to someone other than the recipient. MCP LLP is

authorized and regulated by the Financial Conduct Authority. Millennium

Capital Partners LLP is a limited liability partnership registered in

England & Wales with number OC312897 and with its registered office at

50 Berkeley Street, London, W1J 8HD.

######################################################################
Mime
View raw message