pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] sijie closed pull request #2494: [Schema] Introduce Schema.AUTO to detect schema automatically for consumers and readers
Date Sat, 01 Sep 2018 07:07:14 GMT
sijie closed pull request #2494: [Schema] Introduce Schema.AUTO to detect schema automatically
for consumers and readers
URL: https://github.com/apache/incubator-pulsar/pull/2494
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
index b880953267..24573d5bc9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
@@ -26,6 +26,7 @@
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.pulsar.broker.service.schema.SchemaRegistry;
+import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
 import org.apache.pulsar.client.impl.schema.ProtobufSchema;
@@ -440,4 +441,98 @@ public String toString() {
         }
     }
 
+    @Test
+    public void testAvroProducerAndAutoSchemaConsumer() throws Exception {
+       log.info("-- Starting {} test --", methodName);
+
+       AvroSchema<AvroEncodedPojo> avroSchema =
+           AvroSchema.of(AvroEncodedPojo.class);
+
+       Producer<AvroEncodedPojo> producer = pulsarClient
+           .newProducer(avroSchema)
+           .topic("persistent://my-property/use/my-ns/my-topic1")
+           .create();
+
+       for (int i = 0; i < 10; i++) {
+           String message = "my-message-" + i;
+           producer.send(new AvroEncodedPojo(message));
+       }
+
+       Consumer<GenericRecord> consumer = pulsarClient
+           .newConsumer(Schema.AUTO())
+           .topic("persistent://my-property/use/my-ns/my-topic1")
+           .subscriptionName("my-subscriber-name")
+           .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+           .subscribe();
+
+       Message<GenericRecord> msg = null;
+       Set<String> messageSet = Sets.newHashSet();
+       for (int i = 0; i < 10; i++) {
+           msg = consumer.receive(5, TimeUnit.SECONDS);
+           GenericRecord receivedMessage = msg.getValue();
+           log.debug("Received message: [{}]", receivedMessage);
+           String expectedMessage = "my-message-" + i;
+           String actualMessage = (String) receivedMessage.getField("message");
+           testMessageOrderAndDuplicates(messageSet, actualMessage, expectedMessage);
+       }
+       // Acknowledge the consumption of all messages at once
+       consumer.acknowledgeCumulative(msg);
+       consumer.close();
+
+       SchemaRegistry.SchemaAndMetadata storedSchema = pulsar.getSchemaRegistryService()
+           .getSchema("my-property/my-ns/my-topic1")
+           .get();
+
+       Assert.assertEquals(storedSchema.schema.getData(), avroSchema.getSchemaInfo().getSchema());
+
+       log.info("-- Exiting {} test --", methodName);
+
+   }
+
+   @Test
+    public void testAvroProducerAndAutoSchemaReader() throws Exception {
+       log.info("-- Starting {} test --", methodName);
+
+       AvroSchema<AvroEncodedPojo> avroSchema =
+           AvroSchema.of(AvroEncodedPojo.class);
+
+       Producer<AvroEncodedPojo> producer = pulsarClient
+           .newProducer(avroSchema)
+           .topic("persistent://my-property/use/my-ns/my-topic1")
+           .create();
+
+       for (int i = 0; i < 10; i++) {
+           String message = "my-message-" + i;
+           producer.send(new AvroEncodedPojo(message));
+       }
+
+       Reader<GenericRecord> reader = pulsarClient
+           .newReader(Schema.AUTO())
+           .topic("persistent://my-property/use/my-ns/my-topic1")
+           .startMessageId(MessageId.earliest)
+           .create();
+
+       Message<GenericRecord> msg = null;
+       Set<String> messageSet = Sets.newHashSet();
+       for (int i = 0; i < 10; i++) {
+           msg = reader.readNext();
+           GenericRecord receivedMessage = msg.getValue();
+           log.debug("Received message: [{}]", receivedMessage);
+           String expectedMessage = "my-message-" + i;
+           String actualMessage = (String) receivedMessage.getField("message");
+           testMessageOrderAndDuplicates(messageSet, actualMessage, expectedMessage);
+       }
+       // Acknowledge the consumption of all messages at once
+       reader.close();
+
+       SchemaRegistry.SchemaAndMetadata storedSchema = pulsar.getSchemaRegistryService()
+           .getSchema("my-property/my-ns/my-topic1")
+           .get();
+
+       Assert.assertEquals(storedSchema.schema.getData(), avroSchema.getSchemaInfo().getSchema());
+
+       log.info("-- Exiting {} test --", methodName);
+
+   }
+
 }
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java
index 3088a4de21..113f26c50e 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.client.api;
 
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.schema.AutoSchema;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.client.impl.schema.BytesSchema;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
@@ -77,4 +79,8 @@
     static <T> Schema<T> JSON(Class<T> clazz) {
         return JSONSchema.of(clazz);
     }
+
+    static Schema<GenericRecord> AUTO() {
+        return new AutoSchema();
+    }
 }
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoSchema.java
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoSchema.java
new file mode 100644
index 0000000000..5bf92b71cb
--- /dev/null
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoSchema.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+/**
+ * Auto detect schema.
+ */
+public class AutoSchema implements Schema<GenericRecord> {
+
+    private Schema<GenericRecord> schema;
+
+    public void setSchema(Schema<GenericRecord> schema) {
+        this.schema = schema;
+    }
+
+    private void ensureSchemaInitialized() {
+        checkState(null != schema, "Schema is not initialized before used");
+    }
+
+    @Override
+    public byte[] encode(GenericRecord message) {
+        ensureSchemaInitialized();
+
+        return schema.encode(message);
+    }
+
+    @Override
+    public GenericRecord decode(byte[] bytes) {
+        ensureSchemaInitialized();
+
+        return schema.decode(bytes);
+    }
+
+    @Override
+    public SchemaInfo getSchemaInfo() {
+        ensureSchemaInitialized();
+
+        return schema.getSchemaInfo();
+    }
+}
diff --git a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/AvroSchemaTest.java
b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/AvroSchemaTest.java
index cfe9cb7cba..e5fcc3c4f6 100644
--- a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/AvroSchemaTest.java
+++ b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/AvroSchemaTest.java
@@ -27,6 +27,7 @@
 import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.Schema;
 import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.schema.AutoSchema;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.client.impl.schema.GenericAvroSchema;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -115,11 +116,29 @@ public void testEncodeAndDecode() {
     @Test
     public void testEncodeAndDecodeGenericRecord() {
         AvroSchema<Foo> avroSchema = AvroSchema.of(Foo.class, null);
+        GenericAvroSchema genericAvroSchema = new GenericAvroSchema(avroSchema.getSchemaInfo());
+
+        log.info("Avro Schema : {}", genericAvroSchema.getAvroSchema());
+
+        testGenericSchema(avroSchema, genericAvroSchema);
+    }
+
+    @Test
+    public void testAutoSchema() {
+        AvroSchema<Foo> avroSchema = AvroSchema.of(Foo.class, null);
 
         GenericAvroSchema genericAvroSchema = new GenericAvroSchema(avroSchema.getSchemaInfo());
 
         log.info("Avro Schema : {}", genericAvroSchema.getAvroSchema());
 
+        AutoSchema schema = new AutoSchema();
+        schema.setSchema(genericAvroSchema);
+
+        testGenericSchema(avroSchema, schema);
+    }
+
+    private void testGenericSchema(AvroSchema<Foo> avroSchema,
+                                   org.apache.pulsar.client.api.Schema<GenericRecord>
genericRecordSchema) {
         int numRecords = 10;
         for (int i = 0; i < numRecords; i++) {
             Foo foo = new Foo();
@@ -132,7 +151,7 @@ public void testEncodeAndDecodeGenericRecord() {
 
             byte[] data = avroSchema.encode(foo);
 
-            GenericRecord record = genericAvroSchema.decode(data);
+            GenericRecord record = genericRecordSchema.decode(data);
             Object field1 = record.getField("field1");
             assertEquals("field-1-" + i, field1, "Field 1 is " + field1.getClass());
             Object field2 = record.getField("field2");
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index e156e10820..e82cf6c2f2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -61,12 +62,15 @@
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+import org.apache.pulsar.client.impl.schema.AutoSchema;
+import org.apache.pulsar.client.impl.schema.GenericAvroSchema;
 import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.slf4j.Logger;
@@ -243,6 +247,11 @@ public ClientConfigurationData getConfiguration() {
                     new PulsarClientException.InvalidConfigurationException("Producer configuration
undefined"));
         }
 
+        if (schema instanceof AutoSchema) {
+            return FutureUtil.failedFuture(
+                    new PulsarClientException.InvalidConfigurationException("AutoSchema is
only used by consumers to detect schemas automatically"));
+        }
+
         if (state.get() != State.Open) {
             return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client
already closed : state = " + state.get()));
         }
@@ -377,6 +386,29 @@ public ClientConfigurationData getConfiguration() {
     }
 
     private <T> CompletableFuture<Consumer<T>> singleTopicSubscribeAsync(ConsumerConfigurationData<T>
conf, Schema<T> schema) {
+        if (schema instanceof AutoSchema) {
+            AutoSchema autoSchema = (AutoSchema) schema;
+            return lookup.getSchema(TopicName.get(conf.getSingleTopic()))
+                    .thenCompose(schemaInfoOptional -> {
+                        if (schemaInfoOptional.isPresent() && schemaInfoOptional.get().getType()
== SchemaType.AVRO) {
+                            GenericAvroSchema genericAvroSchema = new GenericAvroSchema(schemaInfoOptional.get());
+                            log.info("Auto detected schema for topic {} : {}",
+                                conf.getSingleTopic(), new String(schemaInfoOptional.get().getSchema(),
UTF_8));
+                            autoSchema.setSchema(genericAvroSchema);
+                            return doSingleTopicSubscribeAsync(conf, schema);
+                        } else {
+                            return FutureUtil.failedFuture(
+                                new PulsarClientException.LookupException("Currently schema
detection only works for topics with avro schemas"));
+                        }
+                    });
+        } else {
+            return doSingleTopicSubscribeAsync(conf, schema);
+        }
+    }
+
+
+
+    private <T> CompletableFuture<Consumer<T>> doSingleTopicSubscribeAsync(ConsumerConfigurationData<T>
conf, Schema<T> schema) {
         CompletableFuture<Consumer<T>> consumerSubscribedFuture = new CompletableFuture<>();
 
         String topic = conf.getSingleTopic();
@@ -505,6 +537,26 @@ public ClientConfigurationData getConfiguration() {
     }
 
     public <T> CompletableFuture<Reader<T>> createReaderAsync(ReaderConfigurationData<T>
conf, Schema<T> schema) {
+        if (schema instanceof AutoSchema) {
+            AutoSchema autoSchema = (AutoSchema) schema;
+            return lookup.getSchema(TopicName.get(conf.getTopicName()))
+                    .thenCompose(schemaInfoOptional -> {
+                        if (schemaInfoOptional.isPresent() && schemaInfoOptional.get().getType()
== SchemaType.AVRO) {
+                            GenericAvroSchema genericAvroSchema = new GenericAvroSchema(schemaInfoOptional.get());
+                            log.info("Auto detected schema for topic {} : {}",
+                                conf.getTopicName(), new String(schemaInfoOptional.get().getSchema(),
UTF_8));
+                            autoSchema.setSchema(genericAvroSchema);
+                            return doCreateReaderAsync(conf, schema);
+                        } else {
+                            return FutureUtil.failedFuture(
+                                new PulsarClientException.LookupException("Currently schema
detection only works for topics with avro schemas"));
+                        }
+                    });
+        } else {
+            return doCreateReaderAsync(conf, schema);
+        }
+    }
+    <T> CompletableFuture<Reader<T>> doCreateReaderAsync(ReaderConfigurationData<T>
conf, Schema<T> schema) {
         if (state.get() != State.Open) {
             return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client
already closed"));
         }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
index cbf7c91768..88adb532a4 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
@@ -45,5 +45,10 @@
     /**
      * Serialize and deserialize via avro
      */
-    AVRO
+    AVRO,
+
+    /**
+     * Auto Detect Schema Type.
+     */
+    AUTO
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message