pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [pulsar] branch master updated: NullPointerException at using BytesSchema.of() (#3754)
Date Fri, 08 Mar 2019 16:33:44 GMT
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 85afd6e  NullPointerException at using BytesSchema.of() (#3754)
85afd6e is described below

commit 85afd6e49912996e594047193811826031134f9a
Author: Sijie Guo <guosijie@gmail.com>
AuthorDate: Sat Mar 9 00:33:39 2019 +0800

    NullPointerException at using BytesSchema.of() (#3754)
    
    Fixes #3734
    
    *Motivation*
    
    Exception occurred when using `BytesSchema.of()`
    
    ```
    Exception in thread "main" java.lang.ExceptionInInitializerError
    	at org.apache.pulsar.examples.simple.ProducerExample.main(ProducerExample.java:32)
    Caused by: java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
    	at org.apache.pulsar.client.internal.ReflectionUtils.catchExceptions(ReflectionUtils.java:36)
    	at org.apache.pulsar.client.internal.DefaultImplementation.newKeyValueSchema(DefaultImplementation.java:158)
    	at org.apache.pulsar.client.api.Schema.<clinit>(Schema.java:123)
    	... 1 more
    Caused by: java.lang.reflect.InvocationTargetException
    	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    	at org.apache.pulsar.client.internal.DefaultImplementation.lambda$newKeyValueSchema$16(DefaultImplementation.java:160)
    	at org.apache.pulsar.client.internal.ReflectionUtils.catchExceptions(ReflectionUtils.java:34)
    	... 3 more
    Caused by: java.lang.NullPointerException
    	at org.apache.pulsar.client.impl.schema.KeyValueSchema.<init>(KeyValueSchema.java:68)
    	... 9 more
    ```
    
    The problem introduced because the weird class loading and reflection sequence.
    
    When accessing `BytesSchema`, `BytesSchema` will try to initialize `Schema`. When initializing
Schema, it will attempts
    to initialize `KV_BYTES` using reflection, and initializing KV_BYTES requires `BytesSchema`.
Hence it causes KV_BYTES not being
    initialized correctly.
    
    The change is to avoid this recursive class loading.
---
 .../java/org/apache/pulsar/client/api/Schema.java  |  4 +-
 .../client/internal/DefaultImplementation.java     | 10 ++++-
 .../pulsar/client/impl/schema/KeyValueSchema.java  | 16 ++++++-
 .../pulsar/client/impl/schema/BytesSchemaTest.java | 52 ++++++++++++++++++++++
 .../client/impl/schema/KeyValueSchemaTest.java     |  4 +-
 .../pulsar/functions/source/TopicSchema.java       |  2 +-
 6 files changed, 80 insertions(+), 8 deletions(-)

diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
index 58b707e..5660773 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
@@ -196,7 +196,9 @@ public interface Schema<T> {
     /**
      * Schema that can be used to encode/decode KeyValue.
      */
-    Schema<KeyValue<byte[], byte[]>> KV_BYTES = DefaultImplementation.newKeyValueSchema(BYTES,
BYTES);
+    static Schema<KeyValue<byte[], byte[]>> KV_BYTES() {
+        return DefaultImplementation.newKeyValueBytesSchema();
+    }
 
     /**
      * Key Value Schema whose underneath key and value schemas are JSONSchema.
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
index a4f4cfd..873b041 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
@@ -212,10 +212,16 @@ public class DefaultImplementation {
                         .newInstance());
     }
 
+    public static Schema<KeyValue<byte[], byte[]>> newKeyValueBytesSchema() {
+        return catchExceptions(
+                () -> (Schema<KeyValue<byte[], byte[]>>) getStaticMethod("org.apache.pulsar.client.impl.schema.KeyValueSchema",
+                        "kvBytes").invoke(null));
+    }
+
     public static <K, V> Schema<KeyValue<K, V>> newKeyValueSchema(Schema<K>
keySchema, Schema<V> valueSchema) {
         return catchExceptions(
-                () -> (Schema<KeyValue<K, V>>) getConstructor("org.apache.pulsar.client.impl.schema.KeyValueSchema",
-                        Schema.class, Schema.class).newInstance(keySchema, valueSchema));
+                () -> (Schema<KeyValue<K, V>>) getStaticMethod("org.apache.pulsar.client.impl.schema.KeyValueSchema",
+                        "of", Schema.class, Schema.class).invoke(null, keySchema, valueSchema));
     }
 
     public static <K, V> Schema<KeyValue<K, V>> newKeyValueSchema(Class<K>
key, Class<V> value, SchemaType type) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
index 0689b7e..a268aea 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
@@ -55,8 +55,20 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K,
V>> {
         }
     }
 
-    public KeyValueSchema(Schema<K> keySchema,
-                          Schema<V> valueSchema) {
+    public static <K, V> Schema<KeyValue<K, V>> of(Schema<K> keySchema,
Schema<V> valueSchema) {
+        return new KeyValueSchema<>(keySchema, valueSchema);
+    }
+
+    private static final Schema<KeyValue<byte[], byte[]>> KV_BYTES = new KeyValueSchema<>(
+        BytesSchema.of(),
+        BytesSchema.of());
+
+    public static Schema<KeyValue<byte[], byte[]>> kvBytes() {
+        return KV_BYTES;
+    }
+
+    private KeyValueSchema(Schema<K> keySchema,
+                           Schema<V> valueSchema) {
         this.keySchema = keySchema;
         this.valueSchema = valueSchema;
 
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/BytesSchemaTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/BytesSchemaTest.java
new file mode 100644
index 0000000..9a2b6ca
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/BytesSchemaTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.testng.Assert.assertSame;
+
+import org.apache.pulsar.client.api.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test {@link BytesSchema}.
+ */
+public class BytesSchemaTest {
+
+    @Test
+    public void testBytesSchemaOf() {
+        testBytesSchema(BytesSchema.of());
+    }
+
+    @Test
+    public void testSchemaBYTES() {
+        testBytesSchema(Schema.BYTES);
+    }
+
+    private void testBytesSchema(Schema<byte[]> schema) {
+        byte[] data = "hello world".getBytes(UTF_8);
+
+        byte[] serializedData = schema.encode(data);
+        assertSame(data, serializedData);
+
+        byte[] deserializedData = schema.decode(serializedData);
+        assertSame(data, deserializedData);
+    }
+
+}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
index d355fc7..2963d62 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
@@ -137,8 +137,8 @@ public class KeyValueSchemaTest {
         byte[] fooBytes = fooAvroSchema.encode(foo);
         byte[] barBytes = barAvroSchema.encode(bar);
 
-        byte[] encodeBytes = Schema.KV_BYTES.encode(new KeyValue<>(fooBytes, barBytes));
-        KeyValue<byte[], byte[]> decodeKV = Schema.KV_BYTES.decode(encodeBytes);
+        byte[] encodeBytes = Schema.KV_BYTES().encode(new KeyValue<>(fooBytes, barBytes));
+        KeyValue<byte[], byte[]> decodeKV = Schema.KV_BYTES().decode(encodeBytes);
 
         Foo fooBack = fooAvroSchema.decode(decodeKV.getKey());
         Bar barBack = barAvroSchema.decode(decodeKV.getValue());
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
index 01ab958..21f8b00 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
@@ -130,7 +130,7 @@ public class TopicSchema {
             return JSONSchema.of(clazz);
 
         case KEY_VALUE:
-            return (Schema<T>)Schema.KV_BYTES;
+            return (Schema<T>)Schema.KV_BYTES();
 
         case PROTOBUF:
             return ProtobufSchema.ofGenericClass(clazz, Collections.emptyMap());


Mime
View raw message