pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] sijie closed pull request #1743: ByteBuffer schema
Date Wed, 26 Sep 2018 17:45:51 GMT
sijie closed pull request #1743: ByteBuffer schema
URL: https://github.com/apache/pulsar/pull/1743
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/proto/SchemaRegistryFormat.proto b/pulsar-broker/src/main/proto/SchemaRegistryFormat.proto
index 8776ddf45f..621a698f53 100644
--- a/pulsar-broker/src/main/proto/SchemaRegistryFormat.proto
+++ b/pulsar-broker/src/main/proto/SchemaRegistryFormat.proto
@@ -27,6 +27,7 @@ message SchemaInfo {
         NONE = 1;
         STRING = 2;
         JSON = 3;
+        BYTE_BUFFER = 4;
     }
     message KeyValuePair {
         required string key = 1;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
index 6b01cd6d83..2155c5720f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
@@ -18,12 +18,14 @@
  */
 package org.apache.pulsar.client.api;
 
-import static org.testng.Assert.assertEquals;
-
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import java.nio.ByteBuffer;
+
+import static org.testng.Assert.assertEquals;
+
 public class SimpleSchemaTest extends ProducerConsumerBase {
 
     @BeforeMethod
@@ -60,4 +62,25 @@ public void testString() throws Exception {
             consumer.acknowledge(msg);
         }
     }
+
+    @Test
+    public void testByteBuffer() throws Exception {
+        Consumer<ByteBuffer> consumer = pulsarClient.newConsumer(Schema.BYTE_BUFFER)
+                .topic("persistent://my-property/my-ns/my-topic1").subscriptionName("my-subscriber-name").subscribe();
+        Producer<ByteBuffer> producer = pulsarClient.newProducer(Schema.BYTE_BUFFER)
+                .topic("persistent://my-property/my-ns/my-topic1").create();
+
+        int N = 10;
+
+        for (int i = 0; i < N; i++) {
+            byte[] data = String.format("my-message-%d", i).getBytes();
+            producer.send(ByteBuffer.wrap(data));
+        }
+
+        for (int i = 0; i < N; i++) {
+            Message<ByteBuffer> msg = consumer.receive();
+            assertEquals(msg.getValue().array(), String.format("my-message-%d", i).getBytes());
+            consumer.acknowledge(msg);
+        }
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java
index 12c35c66a0..7b4e918a7f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java
@@ -18,34 +18,37 @@
  */
 package org.apache.pulsar.client.api;
 
+import org.apache.pulsar.client.impl.schema.ByteBufferSchema;
 import org.apache.pulsar.client.impl.schema.BytesSchema;
 import org.apache.pulsar.client.impl.schema.StringSchema;
 import org.apache.pulsar.common.schema.SchemaInfo;
 
+import java.nio.ByteBuffer;
+
 /**
  * Message schema definition
  */
 public interface Schema<T> {
 
     /**
-     * Encode an object representing the message content into a byte array.
+     * Encode an object representing the message content into a ByteBuffer.
      *
      * @param message
      *            the message object
-     * @return a byte array with the serialized content
+     * @return a ByteBuffer with the serialized content
      * @throws SchemaSerializationException
      *             if the serialization fails
      */
-    byte[] encode(T message);
+    ByteBuffer encode(T message);
 
     /**
-     * Decode a byte array into an object using the schema definition and deserializer implementation
+     * Decode a ByteBuffer into an object using the schema definition and deserializer implementation
      *
-     * @param bytes
-     *            the byte array to decode
+     * @param buf
+     *            the ByteBuffer to decode
      * @return the deserialized object
      */
-    T decode(byte[] bytes);
+    T decode(ByteBuffer buf);
 
     /**
      * @return an object that represents the Schema associated metadata
@@ -61,4 +64,9 @@
      * Schema that can be used to encode/decode messages whose values are String. The payload
is encoded with UTF-8.
      */
     Schema<String> STRING = new StringSchema();
+
+    /**
+     * Schema that uses Java's ByteBuffer class rather than raw byte arrays.
+     */
+    Schema<ByteBuffer> BYTE_BUFFER = new ByteBufferSchema();
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index c019112c33..9c4ab02dab 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -209,7 +209,7 @@ public boolean isExpired(int messageTTLInSeconds) {
 
     @Override
     public T getValue() {
-        return schema.decode(getData());
+        return schema.decode(ByteBuffer.wrap(getData()));
     }
 
     public long getSequenceId() {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
index f30edd60f2..c2f5c6ecf6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
@@ -67,7 +67,7 @@ public MessageId send() throws PulsarClientException {
 
     @Override
     public TypedMessageBuilder<T> value(T value) {
-        this.content = ByteBuffer.wrap(schema.encode(value));
+        this.content = schema.encode(value);
         return this;
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
new file mode 100644
index 0000000000..dcc3080630
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+import java.nio.ByteBuffer;
+
+public class ByteBufferSchema implements Schema<ByteBuffer> {
+    @Override
+    public ByteBuffer encode(ByteBuffer message) {
+        return message;
+    }
+
+    @Override
+    public ByteBuffer decode(ByteBuffer buf) {
+        return buf;
+    }
+
+    @Override
+    public SchemaInfo getSchemaInfo() {
+        SchemaInfo info = new SchemaInfo();
+        info.setName("ByteBuffer");
+        info.setType(SchemaType.BYTE_BUFFER);
+        info.setSchema(new byte[0]);
+        return info;
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java
index 042f04c1bb..36f50544ab 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java
@@ -19,21 +19,29 @@
 package org.apache.pulsar.client.impl.schema;
 
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.util.ByteUtils;
 import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+import java.nio.ByteBuffer;
 
 public class BytesSchema implements Schema<byte[]> {
     @Override
-    public byte[] encode(byte[] message) {
-        return message;
+    public ByteBuffer encode(byte[] message) {
+        return ByteBuffer.wrap(message);
     }
 
     @Override
-    public byte[] decode(byte[] bytes) {
-        return bytes;
+    public byte[] decode(ByteBuffer buf) {
+        return ByteUtils.bufferToBytes(buf);
     }
 
     @Override
     public SchemaInfo getSchemaInfo() {
-        return null;
+        SchemaInfo info = new SchemaInfo();
+        info.setName("Bytes");
+        info.setType(SchemaType.NONE);
+        info.setSchema(new byte[0]);
+        return info;
     }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
index 7eca43d008..e38a6e4a1f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
@@ -23,10 +23,12 @@
 import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
 import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.Map;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.client.util.ByteUtils;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
@@ -43,18 +45,18 @@ private JSONSchema(SchemaInfo info, Class<T> pojo, ObjectMapper
objectMapper) {
     }
 
     @Override
-    public byte[] encode(T message) throws SchemaSerializationException {
+    public ByteBuffer encode(T message) throws SchemaSerializationException {
         try {
-            return objectMapper.writeValueAsBytes(message);
+            return ByteBuffer.wrap(objectMapper.writeValueAsBytes(message));
         } catch (JsonProcessingException e) {
             throw new SchemaSerializationException(e);
         }
     }
 
     @Override
-    public T decode(byte[] bytes) {
+    public T decode(ByteBuffer buf) {
         try {
-            return objectMapper.readValue(new String(bytes), pojo);
+            return objectMapper.readValue(ByteUtils.bufferToBytes(buf), pojo);
         } catch (IOException e) {
             throw new RuntimeException(new SchemaSerializationException(e));
         }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
index a3f528a8ae..413fde4d6a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
@@ -19,9 +19,11 @@
 package org.apache.pulsar.client.impl.schema;
 
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.util.ByteUtils;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
+import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 
@@ -37,19 +39,19 @@ public StringSchema(Charset charset) {
         this.charset = charset;
     }
 
-    public byte[] encode(String message) {
-        return message.getBytes(charset);
+    public ByteBuffer encode(String message) {
+        return ByteBuffer.wrap(message.getBytes(charset));
     }
 
-    public String decode(byte[] bytes) {
-        return new String(bytes, charset);
+    public String decode(ByteBuffer buf) {
+        return new String(ByteUtils.bufferToBytes(buf), charset);
     }
 
     public SchemaInfo getSchemaInfo() {
-        SchemaInfo schemaInfo = new SchemaInfo();
-        schemaInfo.setName("String");
-        schemaInfo.setType(SchemaType.STRING);
-        schemaInfo.setSchema(new byte[0]);
-        return schemaInfo;
+        SchemaInfo info = new SchemaInfo();
+        info.setName("String");
+        info.setType(SchemaType.STRING);
+        info.setSchema(new byte[0]);
+        return info;
     }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ByteUtils.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ByteUtils.java
new file mode 100644
index 0000000000..ce4024e523
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ByteUtils.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.util;
+
+import java.nio.ByteBuffer;
+
+public class ByteUtils {
+    public static byte[] bufferToBytes(ByteBuffer buf) {
+        if (buf.hasArray()) {
+            return buf.array();
+        } else {
+            byte[] bytes = new byte[buf.remaining()];
+            buf.get(bytes, 0, bytes.length);
+            return buf.array();
+        }
+    }
+}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/schemas/DefaultSchemasTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/schemas/DefaultSchemasTest.java
index b9386fcd4f..0009dc5d52 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/schemas/DefaultSchemasTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/schemas/DefaultSchemasTest.java
@@ -26,6 +26,7 @@
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.client.impl.schema.StringSchema;
+import org.apache.pulsar.client.util.ByteUtils;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -73,25 +74,26 @@ public void testReaderInstantiation() {
     @Test
     public void testStringSchema() throws Exception {
         String testString = "hello world";
-        byte[] testBytes = testString.getBytes(StandardCharsets.UTF_8);
+        ByteBuffer testBuf = ByteBuffer.wrap(testString.getBytes(StandardCharsets.UTF_8));
         StringSchema stringSchema = new StringSchema();
-        assertTrue(stringSchema.decode(testBytes).equals(testString));
-        assertEquals(stringSchema.encode(testString), testBytes);
+        assertTrue(stringSchema.decode(testBuf).equals(testString));
+        assertEquals(stringSchema.encode(testString), testBuf);
 
         Message<String> msg1 = MessageBuilder.create(stringSchema)
-                .setContent(testBytes)
+                .setContent(testBuf)
                 .build();
-        assertEquals(stringSchema.decode(msg1.getData()), testString);
+        assertEquals(stringSchema.decode(ByteBuffer.wrap(msg1.getData())), testString);
 
         Message<String> msg2 = MessageBuilder.create(stringSchema)
                 .setValue(testString)
                 .build();
-        assertEquals(stringSchema.encode(testString), msg2.getData());
+        assertEquals(ByteUtils.bufferToBytes(stringSchema.encode(testString)), msg2.getData());
+
+        ByteBuffer testBuf2 = ByteBuffer.wrap(testString.getBytes(StandardCharsets.UTF_16));
 
-        byte[] bytes2 = testString.getBytes(StandardCharsets.UTF_16);
         StringSchema stringSchemaUtf16 = new StringSchema(StandardCharsets.UTF_16);
-        assertTrue(stringSchemaUtf16.decode(bytes2).equals(testString));
-        assertEquals(stringSchemaUtf16.encode(testString), bytes2);
+        assertTrue(stringSchemaUtf16.decode(testBuf2).equals(testString));
+        assertEquals(stringSchemaUtf16.encode(testString), testBuf2);
     }
 
     @AfterClass
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
index 44d78c9bc9..1aa5196708 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
@@ -37,5 +37,10 @@
      */
     JSON,
 
+    /**
+     * Byte buffer schema
+     */
+    BYTE_BUFFER,
+
     PROTOBUF
 }
diff --git a/site/docs/latest/clients/Java.md b/site/docs/latest/clients/Java.md
index 915adb195e..4c58246f17 100644
--- a/site/docs/latest/clients/Java.md
+++ b/site/docs/latest/clients/Java.md
@@ -383,25 +383,36 @@ The following schema formats are currently available for Java:
 
   ```java
   Producer<byte[]> bytesProducer = client.newProducer(Schema.BYTES)
-        .topic("some-raw-bytes-topic")
-        .create();
+          .topic("some-raw-bytes-topic")
+          .create();
   ```
 
   Or, equivalently:
 
   ```java
   Producer<byte[]> bytesProducer = client.newProducer()
-        .topic("some-raw-bytes-topic")
-        .create();
+          .topic("some-raw-bytes-topic")
+          .create();
   ```
 
 * `String` for normal UTF-8-encoded string data. This schema can be applied using `Schema.STRING`:
 
   ```java
   Producer<String> stringProducer = client.newProducer(Schema.STRING)
-        .topic("some-string-topic")
-        .create();
+          .topic("some-string-topic")
+          .create();
   ```
+
+* Java NIO [`ByteBuffer`](https://docs.oracle.com/javase/8/docs/api/java/nio/ByteBuffer.html)s,
which can be applied using `Schema.BYTE_BUFFER`:
+
+  ```java
+  import java.nio.ByteBuffer;
+
+  Producer<ByteBuffer> byteBufProducer = client.newProducer(Schema.BYTE_BUFFER)
+          .topic("some-byte-buf-topic")
+          .create();
+  ```
+
 * JSON schemas can be created for POJOs using the `JSONSchema` class. Here's an example:
 
   ```java
diff --git a/site/docs/latest/getting-started/ConceptsAndArchitecture.md b/site/docs/latest/getting-started/ConceptsAndArchitecture.md
index 6fcad5912a..3f4a1548f6 100644
--- a/site/docs/latest/getting-started/ConceptsAndArchitecture.md
+++ b/site/docs/latest/getting-started/ConceptsAndArchitecture.md
@@ -636,6 +636,7 @@ The following formats are supported by the Pulsar schema registry:
 
 * None. If no schema is specified for a topic, producers and consumers will handle raw bytes.
 * `String` (used for UTF-8-encoded strings)
+* [`ByteBuffer`](https://docs.oracle.com/javase/8/docs/api/java/nio/ByteBuffer.html)
 * [JSON](https://www.json.org/)
 
 For usage instructions, see the documentation for your preferred client library:


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message