pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] merlimat closed pull request #962: Raw Reader for Pulsar Topics
Date Thu, 01 Jan 1970 00:00:00 GMT
merlimat closed pull request #962: Raw Reader for Pulsar Topics
URL: https://github.com/apache/incubator-pulsar/pull/962
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawMessage.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawMessage.java
new file mode 100644
index 000000000..01ea641f8
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawMessage.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
+
+/**
+ * A representation of a message in a topic in its raw form (i.e. as it is stored in a managed
ledger).
+ * RawMessages hold a refcount to the contains ByteBuf, so they must be closed for the ByteBuf
to be freed.
+ */
+public interface RawMessage extends AutoCloseable {
+    /**
+     * Get the message ID of this message.
+     */
+    MessageId getMessageId();
+
+    /**
+     * Get the protobuf representation of the message ID of this message.
+     */
+    MessageIdData getMessageIdData();
+
+    /**
+     * Get a ByteBuf which contains the headers and payload of the message.
+     * The payload may be compressed and encrypted, but whether this is the case can be verified
+     * by decoding the headers which are not.
+     */
+    ByteBuf getHeadersAndPayload();
+
+    @Override
+    void close();
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
new file mode 100644
index 000000000..5883a611e
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.RawReaderImpl;
+
+/**
+ * Topic reader which receives raw messages (i.e. as they are stored in the managed ledger).
+ */
+public interface RawReader {
+    /**
+     * Create a raw reader for a topic.
+     */
+    public static CompletableFuture<RawReader> create(PulsarClient client, String topic)
{
+        CompletableFuture<Consumer> future = new CompletableFuture<>();
+        RawReader r = new RawReaderImpl((PulsarClientImpl)client, topic, future);
+        return future.thenCompose((consumer) -> r.seekAsync(MessageId.earliest)).thenApply((ignore)
-> r);
+    }
+
+    /**
+     * Seek to a location in the topic. After the seek, the first message read will be the
one with
+     * with the specified message ID.
+     * @param messageId the message ID to seek to
+     */
+    CompletableFuture<Void> seekAsync(MessageId messageId);
+
+    /**
+     * Read the next raw message for the topic.
+     * @return a completable future which will return the next RawMessage in the topic.
+     */
+    CompletableFuture<RawMessage> readNextAsync();
+
+    /**
+     * Close the raw reader.
+     */
+    CompletableFuture<Void> closeAsync();
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawMessageImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawMessageImpl.java
new file mode 100644
index 000000000..6c8f7682e
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawMessageImpl.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+
+public class RawMessageImpl implements RawMessage {
+    private final MessageIdData id;
+    private final ByteBuf headersAndPayload;
+
+    RawMessageImpl(MessageIdData id, ByteBuf headersAndPayload) {
+        this.id = id;
+        this.headersAndPayload = headersAndPayload.retainedSlice();
+    }
+
+    @Override
+    public MessageId getMessageId() {
+        return new MessageIdImpl(id.getLedgerId(), id.getEntryId(), id.getPartition());
+    }
+
+    @Override
+    public MessageIdData getMessageIdData() {
+        return id;
+    }
+
+    @Override
+    public ByteBuf getHeadersAndPayload() {
+        return headersAndPayload.slice();
+    }
+
+    @Override
+    public void close() {
+        headersAndPayload.release();
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
new file mode 100644
index 000000000..c861fdd00
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.RawReader;
+import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
+import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
+import io.netty.buffer.ByteBuf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RawReaderImpl implements RawReader {
+
+    final static int DEFAULT_RECEIVER_QUEUE_SIZE = 1000;
+    private final PulsarClientImpl client;
+    private final String topic;
+    private final String subscription;
+    private final ConsumerConfiguration consumerConfiguration;
+    private RawConsumerImpl consumer;
+
+    public RawReaderImpl(PulsarClientImpl client, String topic, CompletableFuture<Consumer>
consumerFuture) {
+        this.client = client;
+        this.topic = topic;
+
+        subscription = "raw-reader";
+
+        consumerConfiguration = new ConsumerConfiguration();
+        consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
+        consumerConfiguration.setReceiverQueueSize(DEFAULT_RECEIVER_QUEUE_SIZE);
+
+        consumer = new RawConsumerImpl(client, topic, subscription, consumerConfiguration,
+                                       consumerFuture);
+    }
+
+    @Override
+    public CompletableFuture<Void> seekAsync(MessageId messageId) {
+        return consumer.seekAsync(messageId);
+    }
+
+    @Override
+    public CompletableFuture<RawMessage> readNextAsync() {
+        return consumer.receiveRawAsync();
+    }
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        return consumer.closeAsync();
+    }
+
+    static class RawConsumerImpl extends ConsumerImpl {
+        final BlockingQueue<RawMessageAndCnx> incomingRawMessages;
+        final Queue<CompletableFuture<RawMessage>> pendingRawReceives;
+
+        RawConsumerImpl(PulsarClientImpl client, String topic, String subscription, ConsumerConfiguration
conf,
+                        CompletableFuture<Consumer> consumerFuture) {
+            super(client, topic, subscription, conf,
+                  client.externalExecutorProvider().getExecutor(), -1, consumerFuture,
+                  SubscriptionMode.Durable, MessageId.earliest);
+            incomingRawMessages = new GrowableArrayBlockingQueue<>();
+            pendingRawReceives = new ConcurrentLinkedQueue<>();
+        }
+
+        void tryCompletePending() {
+            CompletableFuture<RawMessage> future = null;
+            RawMessageAndCnx messageAndCnx = null;
+
+            synchronized (this) {
+                if (!pendingRawReceives.isEmpty()
+                    && !incomingRawMessages.isEmpty()) {
+                    future = pendingRawReceives.remove();
+                    messageAndCnx = incomingRawMessages.remove();
+                }
+            }
+            if (future == null) {
+                assert(messageAndCnx == null);
+            } else {
+                future.complete(messageAndCnx.msg);
+
+                ClientCnx currentCnx = cnx();
+                if (currentCnx == messageAndCnx.cnx) {
+                    increaseAvailablePermits(currentCnx);
+                }
+            }
+        }
+
+        CompletableFuture<RawMessage> receiveRawAsync() {
+            CompletableFuture<RawMessage> result = new CompletableFuture<>();
+            pendingRawReceives.add(result);
+            tryCompletePending();
+            return result;
+        }
+
+        private void reset() {
+            List<CompletableFuture<RawMessage>> toError = new ArrayList<>();
+            synchronized (this) {
+                while (!pendingRawReceives.isEmpty()) {
+                    toError.add(pendingRawReceives.remove());
+                }
+                incomingRawMessages.clear();
+            }
+            toError.forEach((f) -> f.completeExceptionally(
+                                    new PulsarClientException.ConsumerBusyException("Sought
while reading")));
+        }
+
+        @Override
+        public CompletableFuture<Void> seekAsync(MessageId messageId) {
+            reset();
+            return super.seekAsync(messageId);
+        }
+
+        @Override
+        public CompletableFuture<Void> closeAsync() {
+            reset();
+            return super.closeAsync();
+        }
+
+        @Override
+        void messageReceived(MessageIdData messageId, ByteBuf headersAndPayload, ClientCnx
cnx) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}][{}] Received raw message: {}/{}", topic, subscription,
+                          messageId.getLedgerId(), messageId.getEntryId());
+            }
+            incomingRawMessages.add(
+                    new RawMessageAndCnx(new RawMessageImpl(messageId, headersAndPayload),
cnx));
+            tryCompletePending();
+        }
+    }
+
+    private static class RawMessageAndCnx {
+        final RawMessage msg;
+        final ClientCnx cnx;
+
+        RawMessageAndCnx(RawMessage msg, ClientCnx cnx) {
+            this.msg = msg;
+            this.cnx = cnx;
+        }
+    }
+
+    private static final Logger log = LoggerFactory.getLogger(RawReaderImpl.class);
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
new file mode 100644
index 000000000..001d7e04c
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.common.api.Commands;
+import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.client.api.RawReader;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.PropertyAdmin;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class RawReaderTest extends MockedPulsarServiceBaseTest {
+    private static final Logger log = LoggerFactory.getLogger(RawReaderTest.class);
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        super.internalSetup();
+
+        admin.clusters().createCluster("use",
+                new ClusterData("http://127.0.0.1:" + BROKER_WEBSERVICE_PORT));
+        admin.properties().createProperty("my-property",
+                new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
+        admin.namespaces().createNamespace("my-property/use/my-ns");
+    }
+
+    @AfterMethod
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    private Set<String> publishMessages(String topic, int count) throws Exception {
+        Set<String> keys = new HashSet<>();
+        ProducerConfiguration producerConf = new ProducerConfiguration();
+        try (Producer producer = pulsarClient.createProducer(topic, producerConf)) {
+
+            for (int i = 0; i < count; i++) {
+                String key = "key"+i;
+                byte[] data = ("my-message-" + i).getBytes();
+                producer.send(MessageBuilder.create()
+                              .setKey(key)
+                              .setContent(data).build());
+                keys.add(key);
+            }
+        }
+        return keys;
+    }
+
+    private static String extractKey(RawMessage m) throws Exception {
+        ByteBuf headersAndPayload = m.getHeadersAndPayload();
+        MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
+        return msgMetadata.getPartitionKey();
+    }
+
+    @Test
+    public void testRawReader() throws Exception {
+        int numKeys = 10;
+
+        String topic = "persistent://my-property/use/my-ns/my-raw-topic";
+
+        Set<String> keys = publishMessages(topic, numKeys);
+
+        RawReader reader = RawReader.create(pulsarClient, topic).get();
+        try {
+            while (true) { // should break out with TimeoutException
+                try (RawMessage m = reader.readNextAsync().get(1, TimeUnit.SECONDS)) {
+                    Assert.assertTrue(keys.remove(extractKey(m)));
+                }
+            }
+        } catch (TimeoutException te) {
+            // ok
+        }
+
+        Assert.assertTrue(keys.isEmpty());
+    }
+
+    @Test
+    public void testSeekToStart() throws Exception {
+        int numKeys = 10;
+        String topic = "persistent://my-property/use/my-ns/my-raw-topic";
+
+        publishMessages(topic, numKeys);
+
+        Set<String> readKeys = new HashSet<>();
+        RawReader reader = RawReader.create(pulsarClient, topic).get();
+        try {
+            while (true) { // should break out with TimeoutException
+                try (RawMessage m = reader.readNextAsync().get(1, TimeUnit.SECONDS)) {
+                    readKeys.add(extractKey(m));
+                }
+            }
+        } catch (TimeoutException te) {
+            // ok
+        }
+        Assert.assertEquals(readKeys.size(), numKeys);
+
+        // seek to start, read all keys again,
+        // assert that we read all keys we had read previously
+        reader.seekAsync(MessageId.earliest).get();
+        try {
+            while (true) { // should break out with TimeoutException
+                try (RawMessage m = reader.readNextAsync().get(1, TimeUnit.SECONDS)) {
+                    Assert.assertTrue(readKeys.remove(extractKey(m)));
+                }
+            }
+        } catch (TimeoutException te) {
+            // ok
+        }
+        Assert.assertTrue(readKeys.isEmpty());
+    }
+
+    @Test
+    public void testSeekToMiddle() throws Exception {
+        int numKeys = 10;
+        String topic = "persistent://my-property/use/my-ns/my-raw-topic";
+
+        publishMessages(topic, numKeys);
+
+        Set<String> readKeys = new HashSet<>();
+        RawReader reader = RawReader.create(pulsarClient, topic).get();
+        int i = 0;
+        MessageId seekTo = null;
+        try {
+            while (true) { // should break out with TimeoutException
+                try (RawMessage m = reader.readNextAsync().get(1, TimeUnit.SECONDS)) {
+                    i++;
+                    if (i > numKeys/2) {
+                        if (seekTo == null) {
+                            seekTo = m.getMessageId();
+                        }
+                        readKeys.add(extractKey(m));
+                    }
+                }
+            }
+        } catch (TimeoutException te) {
+            // ok
+        }
+        Assert.assertEquals(readKeys.size(), numKeys/2);
+
+        // seek to middle, read all keys again,
+        // assert that we read all keys we had read previously
+        reader.seekAsync(seekTo).get();
+        try {
+            while (true) { // should break out with TimeoutException
+                try (RawMessage m = reader.readNextAsync().get(1, TimeUnit.SECONDS)) {
+                    Assert.assertTrue(readKeys.remove(extractKey(m)));
+                }
+            }
+        } catch (TimeoutException te) {
+            // ok
+        }
+        Assert.assertTrue(readKeys.isEmpty());
+    }
+
+    /**
+     * Try to fill the receiver queue, and drain it multiple times
+     */
+    @Test
+    public void testFlowControl() throws Exception {
+        int numMessages = RawReaderImpl.DEFAULT_RECEIVER_QUEUE_SIZE * 5;
+        String topic = "persistent://my-property/use/my-ns/my-raw-topic";
+
+        publishMessages(topic, numMessages);
+
+        RawReader reader = RawReader.create(pulsarClient, topic).get();
+        List<Future<RawMessage>> futures = new ArrayList<>();
+        Set<String> keys = new HashSet<>();
+
+        // +1 to make sure we read past the end
+        for (int i = 0; i < numMessages + 1; i++) {
+            futures.add(reader.readNextAsync());
+        }
+        int timeouts = 0;
+        for (Future<RawMessage> f : futures) {
+            try (RawMessage m = f.get(1, TimeUnit.SECONDS)) {
+                // Assert each key is unique
+                Assert.assertTrue(keys.add(extractKey(m)));
+            } catch (TimeoutException te) {
+                timeouts++;
+            }
+        }
+        Assert.assertEquals(timeouts, 1);
+        Assert.assertEquals(keys.size(), numMessages);
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 19f922a8a..b4f76c008 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -969,7 +969,7 @@ protected synchronized void messageProcessed(Message msg) {
         }
     }
 
-    private void increaseAvailablePermits(ClientCnx currentCnx) {
+    void increaseAvailablePermits(ClientCnx currentCnx) {
         increaseAvailablePermits(currentCnx, 1);
     }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message