flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino yang <yanghua1...@gmail.com>
Subject Re: AvroInputFormat NullPointerException issues
Date Tue, 17 Jul 2018 15:17:17 GMT
Hi Porritt,

OK, it looks good.

Thanks, vino.

2018-07-17 23:13 GMT+08:00 Porritt, James <James.Porritt@uk.mlp.com>:

> I got to the bottom of this – it was a namespace issue. My schema was;
>
> {
>
>   "type" : "record",
>
>   "name" : "MyAvroSchema",
>
>   "fields" : [ {
>
>     "name" : "a",
>
>     "type" : [ "null", "int" ]
>
>   }, {
>
>     "name" : "b",
>
>     "type" : [ "null", "string" ]
>
>   }]
>
> }
>
> But actually, I was putting the generated MyAvroSchema file into
> ‘my_stats’ namespace (along with my other application code) by adding a
> ‘package my_stats;’ line at the top. When I added “namespace”: “my_stats”
> to the schema and generated the Java that way it was fine.
>
>
>
> *From:* Porritt, James <James.Porritt@uk.mlp.com>
> *Sent:* 17 July 2018 15:10
> *To:* 'vino yang' <yanghua1127@gmail.com>
> *Cc:* user@flink.apache.org
> *Subject:* RE: AvroInputFormat NullPointerException issues
>
>
>
> My MyAvroSchema class is as follows. It was generated using avro-tools:
>
>
>
> /**
>
> * Autogenerated by Avro
>
> *
>
> * DO NOT EDIT DIRECTLY
>
> */
>
>
>
> import org.apache.avro.specific.SpecificData;
>
> import org.apache.avro.message.BinaryMessageEncoder;
>
> import org.apache.avro.message.BinaryMessageDecoder;
>
> import org.apache.avro.message.SchemaStore;
>
>
>
> @SuppressWarnings("all")
>
> @org.apache.avro.specific.AvroGenerated
>
> public class MyAvroSchema extends org.apache.avro.specific.SpecificRecordBase
> implements org.apache.avro.specific.SpecificRecord {
>
>   private static final long serialVersionUID = 4994916517880671663L;
>
>   public static final org.apache.avro.Schema SCHEMA$ = new
> org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",
> \"name\":\"MyAvroSchema\",\"fields\":[{\"name\":\"a\",\"
> type\":[\"null\",\"int\"]},{\"name\":\"b\",\"type\":[\"null\
> ",\"string\"]}]}");
>
>   public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
>
>
>
>   private static SpecificData MODEL$ = new SpecificData();
>
>
>
>   private static final BinaryMessageEncoder<MyAvroSchema> ENCODER =
>
>       new BinaryMessageEncoder<MyAvroSchema>(MODEL$, SCHEMA$);
>
>
>
>   private static final BinaryMessageDecoder<MyAvroSchema> DECODER =
>
>       new BinaryMessageDecoder<MyAvroSchema>(MODEL$, SCHEMA$);
>
>
>
>   /**
>
>    * Return the BinaryMessageDecoder instance used by this class.
>
>    */
>
>   public static BinaryMessageDecoder<MyAvroSchema> getDecoder() {
>
>     return DECODER;
>
>   }
>
>
>
>   /**
>
>    * Create a new BinaryMessageDecoder instance for this class that uses
> the specified {@link SchemaStore}.
>
>    * @param resolver a {@link SchemaStore} used to find schemas by
> fingerprint
>
>    */
>
>   public static BinaryMessageDecoder<MyAvroSchema>
> createDecoder(SchemaStore resolver) {
>
>     return new BinaryMessageDecoder<MyAvroSchema>(MODEL$, SCHEMA$,
> resolver);
>
>   }
>
>
>
>   /** Serializes this MyAvroSchema to a ByteBuffer. */
>
>   public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException {
>
>     return ENCODER.encode(this);
>
>   }
>
>
>
>   /** Deserializes a MyAvroSchema from a ByteBuffer. */
>
>   public static MyAvroSchema fromByteBuffer(
>
>       java.nio.ByteBuffer b) throws java.io.IOException {
>
>     return DECODER.decode(b);
>
>   }
>
>
>
>   @Deprecated public java.lang.Integer a;
>
>   @Deprecated public java.lang.CharSequence b;
>
>
>
>   /**
>
>    * Default constructor.  Note that this does not initialize fields
>
>    * to their default values from the schema.  If that is desired then
>
>    * one should use <code>newBuilder()</code>.
>
>    */
>
>   public MyAvroSchema() {}
>
>
>
>   /**
>
>    * All-args constructor.
>
>    * @param a The new value for a
>
>    * @param b The new value for b
>
>    */
>
>   public MyAvroSchema(java.lang.Integer a, java.lang.CharSequence b) {
>
>     this.a = a;
>
>     this.b = b;
>
>   }
>
>
>
>   public org.apache.avro.Schema getSchema() { return SCHEMA$; }
>
>   // Used by DatumWriter.  Applications should not call.
>
>   public java.lang.Object get(int field$) {
>
>     switch (field$) {
>
>     case 0: return a;
>
>     case 1: return b;
>
>     default: throw new org.apache.avro.AvroRuntimeException("Bad index");
>
>     }
>
>   }
>
>
>
>   // Used by DatumReader.  Applications should not call.
>
>   @SuppressWarnings(value="unchecked")
>
>   public void put(int field$, java.lang.Object value$) {
>
>     switch (field$) {
>
>     case 0: a = (java.lang.Integer)value$; break;
>
>     case 1: b = (java.lang.CharSequence)value$; break;
>
>     default: throw new org.apache.avro.AvroRuntimeException("Bad index");
>
>     }
>
>   }
>
>
>
>   /**
>
>    * Gets the value of the 'a' field.
>
>    * @return The value of the 'a' field.
>
>    */
>
>   public java.lang.Integer getA() {
>
>     return a;
>
>   }
>
>
>
>   /**
>
>    * Sets the value of the 'a' field.
>
>    * @param value the value to set.
>
>    */
>
>   public void setA(java.lang.Integer value) {
>
>     this.a = value;
>
>   }
>
>
>
>   /**
>
>    * Gets the value of the 'b' field.
>
>    * @return The value of the 'b' field.
>
>    */
>
>   public java.lang.CharSequence getB() {
>
>     return b;
>
>   }
>
>
>
>   /**
>
>    * Sets the value of the 'b' field.
>
>    * @param value the value to set.
>
>    */
>
>   public void setB(java.lang.CharSequence value) {
>
>     this.b = value;
>
>   }
>
>
>
>   /**
>
>    * Creates a new MyAvroSchema RecordBuilder.
>
>    * @return A new MyAvroSchema RecordBuilder
>
>    */
>
>   public static MyAvroSchema.Builder newBuilder() {
>
>     return new MyAvroSchema.Builder();
>
>   }
>
>
>
>   /**
>
>    * Creates a new MyAvroSchema RecordBuilder by copying an existing
> Builder.
>
>    * @param other The existing builder to copy.
>
>    * @return A new MyAvroSchema RecordBuilder
>
>    */
>
>   public static MyAvroSchema.Builder newBuilder(MyAvroSchema.Builder
> other) {
>
>     return new MyAvroSchema.Builder(other);
>
>   }
>
>
>
>   /**
>
>    * Creates a new MyAvroSchema RecordBuilder by copying an existing
> MyAvroSchema instance.
>
>    * @param other The existing instance to copy.
>
>    * @return A new MyAvroSchema RecordBuilder
>
>    */
>
>   public static MyAvroSchema.Builder newBuilder(MyAvroSchema other) {
>
>     return new MyAvroSchema.Builder(other);
>
>   }
>
>
>
>   /**
>
>    * RecordBuilder for MyAvroSchema instances.
>
>    */
>
>   public static class Builder extends org.apache.avro.specific.
> SpecificRecordBuilderBase<MyAvroSchema>
>
>     implements org.apache.avro.data.RecordBuilder<MyAvroSchema> {
>
>
>
>     private java.lang.Integer a;
>
>     private java.lang.CharSequence b;
>
>
>
>     /** Creates a new Builder */
>
>     private Builder() {
>
>       super(SCHEMA$);
>
>     }
>
>
>
>     /**
>
>      * Creates a Builder by copying an existing Builder.
>
>      * @param other The existing Builder to copy.
>
>      */
>
>     private Builder(MyAvroSchema.Builder other) {
>
>       super(other);
>
>       if (isValidValue(fields()[0], other.a)) {
>
>         this.a = data().deepCopy(fields()[0].schema(), other.a);
>
>         fieldSetFlags()[0] = true;
>
>       }
>
>       if (isValidValue(fields()[1], other.b)) {
>
>         this.b = data().deepCopy(fields()[1].schema(), other.b);
>
>         fieldSetFlags()[1] = true;
>
>       }
>
>     }
>
>
>
>     /**
>
>      * Creates a Builder by copying an existing MyAvroSchema instance
>
>      * @param other The existing instance to copy.
>
>      */
>
>     private Builder(MyAvroSchema other) {
>
>             super(SCHEMA$);
>
>       if (isValidValue(fields()[0], other.a)) {
>
>         this.a = data().deepCopy(fields()[0].schema(), other.a);
>
>         fieldSetFlags()[0] = true;
>
>       }
>
>       if (isValidValue(fields()[1], other.b)) {
>
>         this.b = data().deepCopy(fields()[1].schema(), other.b);
>
>         fieldSetFlags()[1] = true;
>
>       }
>
>     }
>
>
>
>     /**
>
>       * Gets the value of the 'a' field.
>
>       * @return The value.
>
>       */
>
>     public java.lang.Integer getA() {
>
>       return a;
>
>     }
>
>
>
>     /**
>
>       * Sets the value of the 'a' field.
>
>       * @param value The value of 'a'.
>
>       * @return This builder.
>
>       */
>
>     public MyAvroSchema.Builder setA(java.lang.Integer value) {
>
>       validate(fields()[0], value);
>
>       this.a = value;
>
>       fieldSetFlags()[0] = true;
>
>       return this;
>
>     }
>
>
>
>     /**
>
>       * Checks whether the 'a' field has been set.
>
>       * @return True if the 'a' field has been set, false otherwise.
>
>       */
>
>     public boolean hasA() {
>
>       return fieldSetFlags()[0];
>
>     }
>
>
>
>
>
>     /**
>
>       * Clears the value of the 'a' field.
>
>       * @return This builder.
>
>       */
>
>     public MyAvroSchema.Builder clearA() {
>
>       a = null;
>
>       fieldSetFlags()[0] = false;
>
>       return this;
>
>     }
>
>
>
>     /**
>
>       * Gets the value of the 'b' field.
>
>       * @return The value.
>
>       */
>
>     public java.lang.CharSequence getB() {
>
>       return b;
>
>     }
>
>
>
>     /**
>
>       * Sets the value of the 'b' field.
>
>       * @param value The value of 'b'.
>
>       * @return This builder.
>
>       */
>
>     public MyAvroSchema.Builder setB(java.lang.CharSequence value) {
>
>       validate(fields()[1], value);
>
>       this.b = value;
>
>       fieldSetFlags()[1] = true;
>
>       return this;
>
>     }
>
>
>
>     /**
>
>       * Checks whether the 'b' field has been set.
>
>       * @return True if the 'b' field has been set, false otherwise.
>
>       */
>
>     public boolean hasB() {
>
>       return fieldSetFlags()[1];
>
>     }
>
>
>
>
>
>     /**
>
>       * Clears the value of the 'b' field.
>
>       * @return This builder.
>
>       */
>
>     public MyAvroSchema.Builder clearB() {
>
>       b = null;
>
>       fieldSetFlags()[1] = false;
>
>       return this;
>
>     }
>
>
>
>     @Override
>
>     @SuppressWarnings("unchecked")
>
>     public MyAvroSchema build() {
>
>       try {
>
>         MyAvroSchema record = new MyAvroSchema();
>
>         record.a = fieldSetFlags()[0] ? this.a : (java.lang.Integer)
> defaultValue(fields()[0]);
>
>         record.b = fieldSetFlags()[1] ? this.b : (java.lang.CharSequence)
> defaultValue(fields()[1]);
>
>         return record;
>
>       } catch (java.lang.Exception e) {
>
>         throw new org.apache.avro.AvroRuntimeException(e);
>
>       }
>
>     }
>
>   }
>
>
>
>   @SuppressWarnings("unchecked")
>
>   private static final org.apache.avro.io.DatumWriter<MyAvroSchema>
>
>     WRITER$ = (org.apache.avro.io.DatumWriter<MyAvroSchema>)
> MODEL$.createDatumWriter(SCHEMA$);
>
>
>
>   @Override public void writeExternal(java.io.ObjectOutput out)
>
>     throws java.io.IOException {
>
>     WRITER$.write(this, SpecificData.getEncoder(out));
>
>   }
>
>
>
>   @SuppressWarnings("unchecked")
>
>   private static final org.apache.avro.io.DatumReader<MyAvroSchema>
>
>     READER$ = (org.apache.avro.io.DatumReader<MyAvroSchema>)
> MODEL$.createDatumReader(SCHEMA$);
>
>
>
>  @Override public void readExternal(java.io.ObjectInput in)
>
>     throws java.io.IOException {
>
>     READER$.read(this, SpecificData.getDecoder(in));
>
>   }
>
>
>
> }
>
>
>
> I will check out the other suggestions you make. One concern I have is
> that from the stacktrace I posted it doesn’t actually look like the custom
> class is being called.
>
>
>
> *From:* vino yang <yanghua1127@gmail.com>
> *Sent:* 17 July 2018 05:49
> *To:* Porritt, James <James.Porritt@uk.mlp.com>
> *Cc:* user@flink.apache.org
> *Subject:* Re: AvroInputFormat NullPointerException issues
>
>
>
> Hi Porritt,
>
>
>
> Based on the exception stack trace you provided, it seems the exception
> occurs when initializing Avro schema. You did not give the definition of
> the MyAvroSchema Class, so I'd to suggest you :
>
>
>
> 1. make sure the file path "file:///home/myuser/test.avro" exists in your
> tm node which run your source task.
>
> 2. here is the flink-avro connector documentation[1] you can refer
>
> 3. there are many test case such as this[2], you can use to test your
> program, just need a little change
>
>
>
> [1]: https://ci.apache.org/projects/flink/flink-docs-
> release-1.5/dev/batch/connectors.html#avro-support-in-flink
>
> [2]: https://github.com/apache/flink/blob/master/
> flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/
> AvroInputFormatTypeExtractionTest.java
>
>
>
> ---
>
> thanks.
>
> vino.
>
>
>
> 2018-07-16 20:22 GMT+08:00 Porritt, James <James.Porritt@uk.mlp.com>:
>
> I’ve been trying to use the following code:
>
>
>
>         ExecutionEnvironment env = ExecutionEnvironment.
> getExecutionEnvironment();
>
>         Path path = new Path("file:///home/myuser/test.avro");
>
>         AvroInputFormat<MyAvroSchema> my_format = new
> AvroInputFormat<>(path, MyAvroSchema.class);
>
>         DataSet<MyAvroSchema> my_input = env.createInput(my_format);
>
>         my_input.print();
>
>         env.execute();
>
>
>
> to utilise this avro schema:
>
>
>
> {
>
>   "type" : "record",
>
>   "name" : "MyAvroSchema",
>
>   "fields" : [ {
>
>     "name" : "a",
>
>     "type" : [ "null", "int" ]
>
>   }, {
>
>     "name" : "b",
>
>     "type" : [ "null", "string" ]
>
>   }]
>
> }
>
>
>
> I created the MyAvroSchema class from this schema using avro tools. I also
> converted the following JSON into a compatible avro stored in
> file:///home/myuser/test.avro
>
>
>
> {"a":{"int":123},"b":{"string":"hello"}}
>
>
>
> When I try and run this however I get:
>
>
>
> 2018-07-16 12:59:26,761 INFO  org.apache.flink.runtime.
> executiongraph.ExecutionGraph        - DataSource (at createInput(ExecutionEnvironment.java:548)
> (org.apache.flink.formats.avro.AvroInputFormat)
>
> ) (1/1) (302878b522f420f6b7866de4f32fcbd6) switched from RUNNING to
> FAILED.
>
> org.apache.avro.AvroRuntimeException: avro.shaded.com.google.common.
> util.concurrent.UncheckedExecutionException:
> java.lang.NullPointerException
>
>         at org.apache.avro.specific.SpecificData.getSchema(
> SpecificData.java:227)
>
>         at org.apache.avro.specific.SpecificDatumReader.<init>(
> SpecificDatumReader.java:37)
>
>         at org.apache.flink.formats.avro.AvroInputFormat.initReader(
> AvroInputFormat.java:122)
>
>         at org.apache.flink.formats.avro.AvroInputFormat.open(
> AvroInputFormat.java:111)
>
>         at org.apache.flink.formats.avro.AvroInputFormat.open(
> AvroInputFormat.java:54)
>
>         at org.apache.flink.runtime.operators.DataSourceTask.
> invoke(DataSourceTask.java:170)
>
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>
>         at java.lang.Thread.run(Thread.java:748)
>
> Caused by: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException:
> java.lang.NullPointerException
>
>         at avro.shaded.com.google.common.cache.LocalCache$Segment.get(
> LocalCache.java:2234)
>
>         at avro.shaded.com.google.common.cache.LocalCache.get(
> LocalCache.java:3965)
>
>         at avro.shaded.com.google.common.cache.LocalCache.getOrLoad(
> LocalCache.java:3969)
>
>         at avro.shaded.com.google.common.cache.LocalCache$
> LocalManualCache.get(LocalCache.java:4829)
>
>         at org.apache.avro.specific.SpecificData.getSchema(
> SpecificData.java:225)
>
>         ... 7 more
>
> Caused by: java.lang.NullPointerException
>
>         at java.lang.String.replace(String.java:2239)
>
>         at org.apache.avro.specific.SpecificData.createSchema(
> SpecificData.java:281)
>
>         at org.apache.avro.specific.SpecificData$2.load(
> SpecificData.java:218)
>
>         at org.apache.avro.specific.SpecificData$2.load(
> SpecificData.java:215)
>
>         at avro.shaded.com.google.common.cache.LocalCache$
> LoadingValueReference.loadFuture(LocalCache.java:3568)
>
>         at avro.shaded.com.google.common.cache.LocalCache$Segment.
> loadSync(LocalCache.java:2350)
>
>         at avro.shaded.com.google.common.cache.LocalCache$Segment.
> lockedGetOrLoad(LocalCache.java:2313)
>
>         at avro.shaded.com.google.common.cache.LocalCache$Segment.get(
> LocalCache.java:2228)
>
>         ... 11 more
>
>
>
> Can anyone suggest what might be causing the NullPointerException? I’m
> using flink-1.5.0 and avro-tools-1.8.2
>
>
>
> ######################################################################
>
> The information contained in this communication is confidential and
>
> intended only for the individual(s) named above. If you are not a named
>
> addressee, please notify the sender immediately and delete this email
>
> from your system and do not disclose the email or any part of it to any
>
> person. The views expressed in this email are the views of the author
>
> and do not necessarily represent the views of Millennium Capital Partners
>
> LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic
>
> communications of MCP LLP and its affiliates, including telephone
>
> communications, may be electronically archived and subject to review
>
> and/or disclosure to someone other than the recipient. MCP LLP is
>
> authorized and regulated by the Financial Conduct Authority. Millennium
>
> Capital Partners LLP is a limited liability partnership registered in
>
> England & Wales with number OC312897 and with its registered office at
>
> 50 Berkeley Street, London, W1J 8HD
> <https://maps.google.com/?q=50+Berkeley+Street,+London,+W1J+8HD&entry=gmail&source=g>
> .
>
> ######################################################################
>
>
>
> ######################################################################
>
> The information contained in this communication is confidential and
>
> intended only for the individual(s) named above. If you are not a named
>
> addressee, please notify the sender immediately and delete this email
>
> from your system and do not disclose the email or any part of it to any
>
> person. The views expressed in this email are the views of the author
>
> and do not necessarily represent the views of Millennium Capital Partners
>
> LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic
>
> communications of MCP LLP and its affiliates, including telephone
>
> communications, may be electronically archived and subject to review
>
> and/or disclosure to someone other than the recipient. MCP LLP is
>
> authorized and regulated by the Financial Conduct Authority. Millennium
>
> Capital Partners LLP is a limited liability partnership registered in
>
> England & Wales with number OC312897 and with its registered office at
>
> 50 Berkeley Street, London, W1J 8HD
> <https://maps.google.com/?q=50+Berkeley+Street,+London,+W1J+8HD&entry=gmail&source=g>
> .
>
> ######################################################################
>
>
> ######################################################################
> The information contained in this communication is confidential and
> intended only for the individual(s) named above. If you are not a named
> addressee, please notify the sender immediately and delete this email
> from your system and do not disclose the email or any part of it to any
> person. The views expressed in this email are the views of the author
> and do not necessarily represent the views of Millennium Capital Partners
> LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic
> communications of MCP LLP and its affiliates, including telephone
> communications, may be electronically archived and subject to review
> and/or disclosure to someone other than the recipient. MCP LLP is
> authorized and regulated by the Financial Conduct Authority. Millennium
> Capital Partners LLP is a limited liability partnership registered in
> England & Wales with number OC312897 and with its registered office at
> 50 Berkeley Street, London, W1J 8HD
> <https://maps.google.com/?q=50+Berkeley+Street,+London,+W1J+8HD&entry=gmail&source=g>
> .
> ######################################################################
>
>

Mime
View raw message