pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mme...@apache.org
Subject [pulsar] branch master updated: Added suppport for Kafka partitioner and explicit setting of partition on record (#3462)
Date Fri, 08 Feb 2019 05:25:34 GMT
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c448aaa  Added suppport for Kafka partitioner and explicit setting of partition on
record (#3462)
c448aaa is described below

commit c448aaaed567f961a0e12693f6375b806a45fbc5
Author: Matteo Merli <mmerli@apache.org>
AuthorDate: Thu Feb 7 21:25:28 2019 -0800

    Added suppport for Kafka partitioner and explicit setting of partition on record (#3462)
    
    * Added suppport for Kafka partitioner and explicit setting of partition on record
    
    * Addressed comments
    
    * Use StandardCharsets and fixed alignment
    
    * Fixed initialization of custom partitioner
    
    * Feed realistic cluster metadata for Kafka partitioner implementation
---
 .../clients/producer/PulsarKafkaProducer.java      |  44 ++++++-
 .../client/kafka/compat/KafkaMessageRouter.java    |  44 +++++++
 site2/docs/adaptors-kafka.md                       |   7 +-
 .../integration/compat/kafka/KafkaApiTest.java     | 128 +++++++++++++++++++++
 4 files changed, 215 insertions(+), 8 deletions(-)

diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
index 0552591..b9fbd5e 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
@@ -18,6 +18,7 @@
  */
 package org.apache.kafka.clients.producer;
 
+import java.nio.charset.StandardCharsets;
 import java.util.Base64;
 import java.util.Collections;
 import java.util.HashMap;
@@ -32,6 +33,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
@@ -46,6 +48,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
+import org.apache.pulsar.client.kafka.compat.KafkaMessageRouter;
 import org.apache.pulsar.client.kafka.compat.MessageIdUtils;
 import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
 import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig;
@@ -60,6 +63,9 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V>
{
     private final Serializer<K> keySerializer;
     private final Serializer<V> valueSerializer;
 
+    private final Partitioner partitioner;
+    private volatile Cluster cluster = Cluster.empty();
+
     /** Map that contains the last future for each producer */
     private final ConcurrentMap<String, CompletableFuture<MessageId>> lastSendFuture
= new ConcurrentHashMap<>();
 
@@ -105,6 +111,9 @@ public class PulsarKafkaProducer<K, V> implements Producer<K,
V> {
             producerConfig.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
         }
 
+        partitioner = producerConfig.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG,
Partitioner.class);
+        partitioner.configure(producerConfig.originals());
+
         String serviceUrl = producerConfig.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0);
         try {
             client = PulsarClientKafkaConfig.getClientBuilder(properties).serviceUrl(serviceUrl).build();
@@ -126,6 +135,7 @@ public class PulsarKafkaProducer<K, V> implements Producer<K,
V> {
             pulsarProducerBuilder.compressionType(CompressionType.LZ4);
         }
 
+        pulsarProducerBuilder.messageRouter(new KafkaMessageRouter(lingerMs));
 
         int sendTimeoutMillis = Integer.parseInt(properties.getProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG,
"60000"));
         pulsarProducerBuilder.sendTimeout(sendTimeoutMillis, TimeUnit.MILLISECONDS);
@@ -212,6 +222,7 @@ public class PulsarKafkaProducer<K, V> implements Producer<K,
V> {
     @Override
     public void close() {
         close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+        partitioner.close();
     }
 
     @Override
@@ -225,19 +236,34 @@ public class PulsarKafkaProducer<K, V> implements Producer<K,
V> {
 
     private org.apache.pulsar.client.api.Producer<byte[]> createNewProducer(String
topic) {
         try {
+            // Add the partitions info for the new topic
+            cluster = cluster.withPartitions(readPartitionsInfo(topic));
             return pulsarProducerBuilder.clone().topic(topic).create();
         } catch (PulsarClientException e) {
             throw new RuntimeException(e);
         }
     }
 
-    private int buildMessage(TypedMessageBuilder<byte[]> builder, ProducerRecord<K,
V> record) {
-        if (record.partition() != null) {
-            throw new UnsupportedOperationException("");
+    private Map<TopicPartition, PartitionInfo> readPartitionsInfo(String topic) {
+        List<String> partitions = client.getPartitionsForTopic(topic).join();
+
+        Map<TopicPartition, PartitionInfo> partitionsInfo = new HashMap<>();
+
+        for (int i = 0; i < partitions.size(); i++) {
+            TopicPartition tp = new TopicPartition(topic, i);
+            PartitionInfo pi = new PartitionInfo(topic, i, null, null, null);
+            partitionsInfo.put(tp, pi);
         }
 
+        return partitionsInfo;
+    }
+
+    private int buildMessage(TypedMessageBuilder<byte[]> builder, ProducerRecord<K,
V> record) {
+        byte[] keyBytes = null;
         if (record.key() != null) {
-            builder.key(getKey(record.topic(), record.key()));
+            String key = getKey(record.topic(), record.key());
+            keyBytes = key.getBytes(StandardCharsets.UTF_8);
+            builder.key(key);
         }
 
         if (record.timestamp() != null) {
@@ -246,6 +272,16 @@ public class PulsarKafkaProducer<K, V> implements Producer<K,
V> {
 
         byte[] value = valueSerializer.serialize(record.topic(), record.value());
         builder.value(value);
+
+        if (record.partition() != null) {
+            // Partition was explicitely set on the record
+            builder.property(KafkaMessageRouter.PARTITION_ID, record.partition().toString());
+        } else {
+            // Get the partition id from the partitioner
+            int partition = partitioner.partition(record.topic(), record.key(), keyBytes,
record.value(), value, cluster);
+            builder.property(KafkaMessageRouter.PARTITION_ID, Integer.toString(partition));
+        }
+
         return value.length;
     }
 
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaMessageRouter.java
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaMessageRouter.java
new file mode 100644
index 0000000..d3fb915
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaMessageRouter.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.pulsar.client.api.HashingScheme;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.TopicMetadata;
+import org.apache.pulsar.client.impl.RoundRobinPartitionMessageRouterImpl;
+
+public class KafkaMessageRouter extends RoundRobinPartitionMessageRouterImpl {
+
+    public static final String PARTITION_ID = "pulsar.partition.id";
+
+    public KafkaMessageRouter(long maxBatchingDelayMs) {
+        super(HashingScheme.JavaStringHash, ThreadLocalRandom.current().nextInt(), true,
maxBatchingDelayMs);
+    }
+
+    @Override
+    public int choosePartition(Message<?> msg, TopicMetadata metadata) {
+        if (msg.hasProperty(PARTITION_ID)) {
+            return Integer.parseInt(msg.getProperty(PARTITION_ID));
+        } else {
+            return super.choosePartition(msg, metadata);
+        }
+    }
+}
diff --git a/site2/docs/adaptors-kafka.md b/site2/docs/adaptors-kafka.md
index 21c9b26..abe3a47 100644
--- a/site2/docs/adaptors-kafka.md
+++ b/site2/docs/adaptors-kafka.md
@@ -116,7 +116,7 @@ APIs:
 
 | Producer Method                                                               | Supported
| Notes                                                                    |
 |:------------------------------------------------------------------------------|:----------|:-------------------------------------------------------------------------|
-| `Future<RecordMetadata> send(ProducerRecord<K, V> record)`                
   | Yes       | Currently no support for explicitly set the partition id when publishing
|
+| `Future<RecordMetadata> send(ProducerRecord<K, V> record)`                
   | Yes       |                                                                         
|
 | `Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)`
| Yes       |                                                                          |
 | `void flush()`                                                                | Yes   
   |                                                                          |
 | `List<PartitionInfo> partitionsFor(String topic)`                             | No
       |                                                                          |
@@ -129,7 +129,7 @@ Properties:
 | Config property                         | Supported | Notes                           
                                             |
 |:----------------------------------------|:----------|:------------------------------------------------------------------------------|
 | `acks`                                  | Ignored   | Durability and quorum writes are
configured at the namespace level            |
-| `auto.offset.reset`			  | Yes       | Will have a default value of 'latest' if user does
not give specific setting. |
+| `auto.offset.reset`                     | Yes       | Will have a default value of `latest`
if user does not give specific setting. |
 | `batch.size`                            | Ignored   |                                 
                                             |
 | `block.on.buffer.full`                  | Yes       | If true it will block producer, otherwise
give error                          |
 | `bootstrap.servers`                     | Yes       | Needs to point to a single Pulsar
service URL                                 |
@@ -146,7 +146,7 @@ Properties:
 | `metric.reporters`                      | Ignored   |                                 
                                             |
 | `metrics.num.samples`                   | Ignored   |                                 
                                             |
 | `metrics.sample.window.ms`              | Ignored   |                                 
                                             |
-| `partitioner.class`                     | Ignored   |                                 
                                             |
+| `partitioner.class`                     | Yes       |                                 
                                             |
 | `receive.buffer.bytes`                  | Ignored   |                                 
                                             |
 | `reconnect.backoff.ms`                  | Ignored   |                                 
                                             |
 | `request.timeout.ms`                    | Ignored   |                                 
                                             |
@@ -261,4 +261,3 @@ You can configure Pulsar authentication provider directly from the Kafka
propert
 | [`pulsar.consumer.acknowledgments.group.time.millis`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerBuilder.html#acknowledgmentGroupTime-long-java.util.concurrent.TimeUnit-)
| 100 | Set the max amount of group time for consumers to send out the acknowledgments to
the broker |
 | [`pulsar.consumer.total.receiver.queue.size.across.partitions`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerConfiguration.html#setMaxTotalReceiverQueueSizeAcrossPartitions-int-)
| 50000 | Set the max total receiver queue size across partitions |
 | [`pulsar.consumer.subscription.topics.mode`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerBuilder.html#subscriptionTopicsMode-Mode-)
| PersistentOnly | Set the subscription topic mode for consumers |
-
diff --git a/tests/pulsar-kafka-compat-client-test/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/KafkaApiTest.java
b/tests/pulsar-kafka-compat-client-test/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/KafkaApiTest.java
index fe2a4d9..0e520de 100644
--- a/tests/pulsar-kafka-compat-client-test/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/KafkaApiTest.java
+++ b/tests/pulsar-kafka-compat-client-test/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/KafkaApiTest.java
@@ -40,9 +40,11 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Partitioner;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
@@ -301,6 +303,132 @@ public class KafkaApiTest extends PulsarStandaloneTestSuite {
     }
 
     @Test
+    public void testExplicitPartitions() throws Exception {
+        String topic = "testExplicitPartitions";
+
+        // Create 8 partitions in topic
+        @Cleanup
+        PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(getHttpServiceUrl()).build();
+        admin.topics().createPartitionedTopic(topic, 8);
+
+        Properties producerProperties = new Properties();
+        producerProperties.put("bootstrap.servers", getPlainTextServiceUrl());
+        producerProperties.put("key.serializer", IntegerSerializer.class.getName());
+        producerProperties.put("value.serializer", StringSerializer.class.getName());
+
+        @Cleanup
+        Producer<Integer, String> producer = new KafkaProducer<>(producerProperties);
+
+        Properties props = new Properties();
+        props.put("bootstrap.servers", getPlainTextServiceUrl());
+        props.put("group.id", "my-subscription-name");
+        props.put("enable.auto.commit", "true");
+        props.put("key.deserializer", StringDeserializer.class.getName());
+        props.put("value.deserializer", StringDeserializer.class.getName());
+
+        // Create Kakfa consumer and verify all messages came from intended partition
+        @Cleanup
+        Consumer<String, String> consumer = new KafkaConsumer<>(props);
+        consumer.subscribe(Arrays.asList(topic));
+
+        int N = 8 * 3;
+
+        final int choosenPartition = 5;
+
+        for (int i = 0; i < N; i++) {
+            producer.send(new ProducerRecord<>(topic, choosenPartition, i, "hello-"
+ i));
+        }
+
+        producer.flush();
+
+        for (int i = 0; i < N;) {
+            ConsumerRecords<String, String> records = consumer.poll(100);
+            i += records.count();
+
+            records.forEach(record -> {
+                assertEquals(record.partition(), choosenPartition);
+            });
+        }
+
+        // No more messages for this consumer
+        ConsumerRecords<String, String> records = consumer.poll(100);
+        assertEquals(records.count(), 0);
+    }
+
+    public static class MyCustomPartitioner implements Partitioner {
+
+        static int USED_PARTITION = 3;
+
+        @Override
+        public void configure(Map<String, ?> conf) {
+            // Do nothing
+        }
+
+        @Override
+        public void close() {
+            // Do nothing
+        }
+
+        @Override
+        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[]
valueBytes, Cluster cluster) {
+            // Dummy implementation that always return same partition
+            return USED_PARTITION;
+        }
+    }
+
+    @Test
+    public void testCustomRouter() throws Exception {
+        String topic = "testCustomRouter";
+
+        // Create 8 partitions in topic
+        @Cleanup
+        PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(getHttpServiceUrl()).build();
+        admin.topics().createPartitionedTopic(topic, 8);
+
+        Properties producerProperties = new Properties();
+        producerProperties.put("bootstrap.servers", getPlainTextServiceUrl());
+        producerProperties.put("key.serializer", IntegerSerializer.class.getName());
+        producerProperties.put("value.serializer", StringSerializer.class.getName());
+        producerProperties.put("partitioner.class", MyCustomPartitioner.class.getName());
+
+        @Cleanup
+        Producer<Integer, String> producer = new KafkaProducer<>(producerProperties);
+
+        Properties props = new Properties();
+        props.put("bootstrap.servers", getPlainTextServiceUrl());
+        props.put("group.id", "my-subscription-name");
+        props.put("enable.auto.commit", "true");
+        props.put("key.deserializer", IntegerDeserializer.class.getName());
+        props.put("value.deserializer", StringDeserializer.class.getName());
+
+        // Create Kakfa consumer and verify all messages came from intended partition
+        @Cleanup
+        Consumer<Integer, String> consumer = new KafkaConsumer<>(props);
+        consumer.subscribe(Arrays.asList(topic));
+
+        int N = 8 * 3;
+
+        for (int i = 0; i < N; i++) {
+            producer.send(new ProducerRecord<>(topic, i, "hello-" + i));
+        }
+
+        producer.flush();
+
+        for (int i = 0; i < N;) {
+            ConsumerRecords<Integer, String> records = consumer.poll(100);
+            i += records.count();
+
+            records.forEach(record -> {
+                assertEquals(record.partition(), MyCustomPartitioner.USED_PARTITION);
+            });
+        }
+
+        // No more messages for this consumer
+        ConsumerRecords<Integer, String> records = consumer.poll(100);
+        assertEquals(records.count(), 0);
+    }
+
+    @Test
     public void testConsumerSeek() throws Exception {
         String topic = "testConsumerSeek";
 


Mime
View raw message