From commits-return-13401-archive-asf-public=cust-asf.ponee.io@pulsar.incubator.apache.org Mon Aug 27 20:18:41 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 394BD180675 for ; Mon, 27 Aug 2018 20:18:41 +0200 (CEST) Received: (qmail 51876 invoked by uid 500); 27 Aug 2018 18:18:40 -0000 Mailing-List: contact commits-help@pulsar.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pulsar.incubator.apache.org Delivered-To: mailing list commits@pulsar.incubator.apache.org Received: (qmail 51867 invoked by uid 99); 27 Aug 2018 18:18:40 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 27 Aug 2018 18:18:40 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id B6D53807CF; Mon, 27 Aug 2018 18:18:39 +0000 (UTC) Date: Mon, 27 Aug 2018 18:18:39 +0000 To: "commits@pulsar.apache.org" Subject: [incubator-pulsar] branch branch-2.1 updated: [compaction] make topic compaction works with partitioned topic (#2367) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <153539391958.11563.7906966948296349223@gitbox.apache.org> From: sijie@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: incubator-pulsar X-Git-Refname: refs/heads/branch-2.1 X-Git-Reftype: branch X-Git-Oldrev: bcb3d3f6d15995b74c51d0507fece569361928da X-Git-Newrev: f97a8f7d136ddc609d01ce54e20f45e9949b04df X-Git-Rev: f97a8f7d136ddc609d01ce54e20f45e9949b04df X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/branch-2.1 by this push: new f97a8f7 [compaction] make topic compaction works with partitioned topic (#2367) f97a8f7 is described below commit f97a8f7d136ddc609d01ce54e20f45e9949b04df Author: Sijie Guo AuthorDate: Tue Aug 14 11:02:24 2018 -0700 [compaction] make topic compaction works with partitioned topic (#2367) * [compaction] make topic compaction works with partitioned topic ### Motivation Topic compaction doesn't work with partitioned topic. ### Changes - make `RawReaderImpl` and `ReaderImpl` return message with partition idx - make broker service `Consumer` deliver MessageIdData with partition idx - add an integration test to ensure compaction work with partitioned topic --- .../java/org/apache/pulsar/broker/service/Consumer.java | 9 +++++++-- .../org/apache/pulsar/client/impl/RawReaderImpl.java | 16 ++++++++++++---- .../org/apache/pulsar/compaction/TwoPhaseCompactor.java | 8 ++++---- .../PersistentDispatcherFailoverConsumerTest.java | 2 +- .../java/org/apache/pulsar/client/impl/ReaderImpl.java | 4 +++- 5 files changed, 27 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index c3befa5..4c54018 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -73,6 +73,7 @@ public class Consumer { private final String appId; private AuthenticationDataSource authenticationData; private final String topicName; + private final int partitionIdx; private final InitialPosition subscriptionInitialPosition; private final long consumerId; @@ -119,6 +120,7 @@ public class Consumer { this.subscription = subscription; this.subType = subType; this.topicName = topicName; + this.partitionIdx = TopicName.getPartitionIndex(topicName); this.consumerId = consumerId; this.priorityLevel = priorityLevel; this.readCompacted = readCompacted; @@ -239,8 +241,11 @@ public class Consumer { Entry entry = entries.get(i); PositionImpl pos = (PositionImpl) entry.getPosition(); MessageIdData.Builder messageIdBuilder = MessageIdData.newBuilder(); - MessageIdData messageId = messageIdBuilder.setLedgerId(pos.getLedgerId()).setEntryId(pos.getEntryId()) - .build(); + MessageIdData messageId = messageIdBuilder + .setLedgerId(pos.getLedgerId()) + .setEntryId(pos.getEntryId()) + .setPartition(partitionIdx) + .build(); ByteBuf metadataAndPayload = entry.getDataBuffer(); // increment ref-count of data and release at the end of process: so, we can get chance to call entry.release diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java index e768c3e..ae1a4db 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java @@ -35,6 +35,7 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -102,8 +103,15 @@ public class RawReaderImpl implements RawReader { RawConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData conf, CompletableFuture> consumerFuture) { - super(client, conf.getSingleTopic(), conf, client.externalExecutorProvider().getExecutor(), -1, - consumerFuture, SubscriptionMode.Durable, MessageId.earliest, Schema.BYTES); + super(client, + conf.getSingleTopic(), + conf, + client.externalExecutorProvider().getExecutor(), + TopicName.getPartitionIndex(conf.getSingleTopic()), + consumerFuture, + SubscriptionMode.Durable, + MessageId.earliest, + Schema.BYTES); incomingRawMessages = new GrowableArrayBlockingQueue<>(); pendingRawReceives = new ConcurrentLinkedQueue<>(); } @@ -172,8 +180,8 @@ public class RawReaderImpl implements RawReader { @Override void messageReceived(MessageIdData messageId, ByteBuf headersAndPayload, ClientCnx cnx) { if (log.isDebugEnabled()) { - log.debug("[{}][{}] Received raw message: {}/{}", topic, subscription, - messageId.getLedgerId(), messageId.getEntryId()); + log.debug("[{}][{}] Received raw message: {}/{}/{}", topic, subscription, + messageId.getEntryId(), messageId.getLedgerId(), messageId.getPartition()); } incomingRawMessages.add( new RawMessageAndCnx(new RawMessageImpl(messageId, headersAndPayload), cnx)); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java index b4ee68a..cc3f710 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java @@ -146,11 +146,11 @@ public class TwoPhaseCompactor extends Compactor { private void scheduleTimeout(CompletableFuture future) { Future timeout = scheduler.schedule(() -> { - future.completeExceptionally(new TimeoutException("Timeout")); - }, 10, TimeUnit.SECONDS); + future.completeExceptionally(new TimeoutException("Timeout")); + }, 10, TimeUnit.SECONDS); future.whenComplete((res, exception) -> { - timeout.cancel(true); - }); + timeout.cancel(true); + }); } private CompletableFuture phaseTwo(RawReader reader, MessageId from, MessageId to, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java index ccd32e1..6f16733 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java @@ -579,7 +579,7 @@ public class PersistentDispatcherFailoverConsumerTest { private Consumer createConsumer(int priority, int permit, boolean blocked, int id) throws Exception { Consumer consumer = - new Consumer(null, SubType.Shared, null, id, priority, ""+id, 5000, + new Consumer(null, SubType.Shared, "test-topic", id, priority, ""+id, 5000, serverCnx, "appId", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest); try { consumer.flowPermits(permit); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java index 00f8af0..aafd125 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java @@ -36,6 +36,7 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.impl.conf.ReaderConfigurationData; +import org.apache.pulsar.common.naming.TopicName; public class ReaderImpl implements Reader { @@ -82,8 +83,9 @@ public class ReaderImpl implements Reader { consumerConfiguration.setCryptoKeyReader(readerConfiguration.getCryptoKeyReader()); } + final int partitionIdx = TopicName.getPartitionIndex(readerConfiguration.getTopicName()); consumer = new ConsumerImpl<>(client, readerConfiguration.getTopicName(), consumerConfiguration, listenerExecutor, - -1, consumerFuture, SubscriptionMode.NonDurable, readerConfiguration.getStartMessageId(), schema); + partitionIdx, consumerFuture, SubscriptionMode.NonDurable, readerConfiguration.getStartMessageId(), schema); } @Override