From commits-return-8777-archive-asf-public=cust-asf.ponee.io@pulsar.incubator.apache.org Fri Jun 1 08:19:16 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id BB16E18063A for ; Fri, 1 Jun 2018 08:19:15 +0200 (CEST) Received: (qmail 80714 invoked by uid 500); 1 Jun 2018 06:19:14 -0000 Mailing-List: contact commits-help@pulsar.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pulsar.incubator.apache.org Delivered-To: mailing list commits@pulsar.incubator.apache.org Received: (qmail 80704 invoked by uid 99); 1 Jun 2018 06:19:14 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Jun 2018 06:19:14 +0000 From: GitBox To: commits@pulsar.apache.org Subject: [GitHub] merlimat closed pull request #1882: Fixed event time metadata on batched messages Message-ID: <152783395415.17176.13866871087992660794.gitbox@gitbox.apache.org> Date: Fri, 01 Jun 2018 06:19:14 -0000 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit merlimat closed pull request #1882: Fixed event time metadata on batched messages URL: https://github.com/apache/incubator-pulsar/pull/1882 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java index 76b9a77057..a5beefaceb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java @@ -58,6 +58,7 @@ import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.ConsumerImpl; @@ -1389,4 +1390,20 @@ public void testGetTopicIfExists() throws Exception { Optional t = pulsar.getBrokerService().getTopicReference(topicName); assertFalse(t.isPresent()); } + + @Test + public void testWithEventTime() throws Exception { + final String topicName = "prop/ns-abc/topic-event-time"; + final String subName = "sub"; + + Consumer consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subName) + .subscribe(); + Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create(); + + producer.newMessage().value("test").eventTime(5).send(); + Message msg = consumer.receive(); + assertNotNull(msg); + assertEquals(msg.getValue(), "test"); + assertEquals(msg.getEventTime(), 5); + } } diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc index 15ffe4107c..27297b5193 100644 --- a/pulsar-client-cpp/lib/Commands.cc +++ b/pulsar-client-cpp/lib/Commands.cc @@ -416,6 +416,10 @@ void Commands::serializeSingleMessageInBatchWithPayload(const Message& msg, Shar metadata.mutable_properties()->AddAllocated(keyValue); } + if (msg.impl_->getEventTimestamp() != 0) { + metadata.set_event_time(msg.impl_->getEventTimestamp()); + } + // Format of batch message // Each Message = [METADATA_SIZE][METADATA] [PAYLOAD] diff --git a/pulsar-client-cpp/lib/Message.cc b/pulsar-client-cpp/lib/Message.cc index 6b46fd0773..683e718ac2 100644 --- a/pulsar-client-cpp/lib/Message.cc +++ b/pulsar-client-cpp/lib/Message.cc @@ -79,6 +79,14 @@ Message::Message(const MessageId& messageID, proto::MessageMetadata& metadata, S impl_->metadata = metadata; impl_->payload = payload; impl_->metadata.mutable_properties()->CopyFrom(singleMetadata.properties()); + + if (singleMetadata.has_partition_key()) { + impl_->metadata.set_partition_key(singleMetadata.partition_key()); + } + + if (singleMetadata.has_event_time()) { + impl_->metadata.set_event_time(singleMetadata.event_time()); + } } const MessageId& Message::getMessageId() const { diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc index d4f6a0b058..f9a6592569 100644 --- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc +++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc @@ -1303,3 +1303,29 @@ TEST(BasicEndToEndTest, testEncryptionFailure) { // Since messag is discarded, no message will be received. ASSERT_EQ(ResultTimeout, consumer.receive(msgReceived, 5000)); } + +TEST(BasicEndToEndTest, testEventTime) { + ClientConfiguration config; + Client client(lookupUrl, config); + std::string topicName = "persistent://prop/unit/ns1/topic"; + Producer producer; + ProducerConfiguration producerConf; + producerConf.setBatchingEnabled(true); + Result result = client.createProducer(topicName, producerConf, producer); + ASSERT_EQ(ResultOk, result); + + Consumer consumer; + result = client.subscribe(topicName, "sub", consumer); + ASSERT_EQ(ResultOk, result); + + producer.send(MessageBuilder().setContent("test").setEventTimestamp(5).build()); + + Message msg; + result = consumer.receive(msg); + ASSERT_EQ(ResultOk, result); + + ASSERT_EQ(msg.getEventTimestamp(), 5); + + consumer.close(); + producer.close(); +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java index 7001530b59..c019112c33 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java @@ -121,6 +121,10 @@ msgMetadataBuilder.setPartitionKey(singleMessageMetadata.getPartitionKey()); } + if (singleMessageMetadata.hasEventTime()) { + msgMetadataBuilder.setEventTime(singleMessageMetadata.getEventTime()); + } + this.schema = schema; } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java index dea948ce00..84d021881b 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java @@ -18,10 +18,10 @@ */ package org.apache.pulsar.common.api; -import static org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.copyFrom; -import static org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.copyFromUtf8; import static com.scurrilous.circe.checksum.Crc32cIntChecksum.computeChecksum; import static com.scurrilous.circe.checksum.Crc32cIntChecksum.resumeChecksum; +import static org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.copyFrom; +import static org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.copyFromUtf8; import com.google.common.annotations.VisibleForTesting; @@ -980,6 +980,11 @@ public static ByteBuf serializeSingleMessageInBatchWithPayload(PulsarApi.Message singleMessageMetadataBuilder = singleMessageMetadataBuilder .addAllProperties(msgBuilder.getPropertiesList()); } + + if (msgBuilder.hasEventTime()) { + singleMessageMetadataBuilder.setEventTime(msgBuilder.getEventTime()); + } + try { return serializeSingleMessageInBatchWithPayload(singleMessageMetadataBuilder, payload, batchBuffer); } finally { @@ -997,6 +1002,7 @@ public static ByteBuf deSerializeSingleMessageInBatch(ByteBuf uncompressedPayloa ByteBufCodedInputStream stream = ByteBufCodedInputStream.get(uncompressedPayload); PulsarApi.SingleMessageMetadata singleMessageMetadata = singleMessageMetadataBuilder.mergeFrom(stream, null) .build(); + int singleMessagePayloadSize = singleMessageMetadata.getPayloadSize(); uncompressedPayload.markReaderIndex(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java index 573ed5080b..ce0124b784 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java @@ -4544,6 +4544,10 @@ public Builder clearSchemaVersion() { // optional bool compacted_out = 4 [default = false]; boolean hasCompactedOut(); boolean getCompactedOut(); + + // optional uint64 event_time = 5 [default = 0]; + boolean hasEventTime(); + long getEventTime(); } public static final class SingleMessageMetadata extends org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite @@ -4653,11 +4657,22 @@ public boolean getCompactedOut() { return compactedOut_; } + // optional uint64 event_time = 5 [default = 0]; + public static final int EVENT_TIME_FIELD_NUMBER = 5; + private long eventTime_; + public boolean hasEventTime() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + public long getEventTime() { + return eventTime_; + } + private void initFields() { properties_ = java.util.Collections.emptyList(); partitionKey_ = ""; payloadSize_ = 0; compactedOut_ = false; + eventTime_ = 0L; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -4698,6 +4713,9 @@ public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr if (((bitField0_ & 0x00000004) == 0x00000004)) { output.writeBool(4, compactedOut_); } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + output.writeUInt64(5, eventTime_); + } } private int memoizedSerializedSize = -1; @@ -4722,6 +4740,10 @@ public int getSerializedSize() { size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream .computeBoolSize(4, compactedOut_); } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream + .computeUInt64Size(5, eventTime_); + } memoizedSerializedSize = size; return size; } @@ -4843,6 +4865,8 @@ public Builder clear() { bitField0_ = (bitField0_ & ~0x00000004); compactedOut_ = false; bitField0_ = (bitField0_ & ~0x00000008); + eventTime_ = 0L; + bitField0_ = (bitField0_ & ~0x00000010); return this; } @@ -4893,6 +4917,10 @@ public Builder clone() { to_bitField0_ |= 0x00000004; } result.compactedOut_ = compactedOut_; + if (((from_bitField0_ & 0x00000010) == 0x00000010)) { + to_bitField0_ |= 0x00000008; + } + result.eventTime_ = eventTime_; result.bitField0_ = to_bitField0_; return result; } @@ -4918,6 +4946,9 @@ public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.SingleMess if (other.hasCompactedOut()) { setCompactedOut(other.getCompactedOut()); } + if (other.hasEventTime()) { + setEventTime(other.getEventTime()); + } return this; } @@ -4978,6 +5009,11 @@ public Builder mergeFrom( compactedOut_ = input.readBool(); break; } + case 40: { + bitField0_ |= 0x00000010; + eventTime_ = input.readUInt64(); + break; + } } } } @@ -5151,6 +5187,27 @@ public Builder clearCompactedOut() { return this; } + // optional uint64 event_time = 5 [default = 0]; + private long eventTime_ ; + public boolean hasEventTime() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + public long getEventTime() { + return eventTime_; + } + public Builder setEventTime(long value) { + bitField0_ |= 0x00000010; + eventTime_ = value; + + return this; + } + public Builder clearEventTime() { + bitField0_ = (bitField0_ & ~0x00000010); + eventTime_ = 0L; + + return this; + } + // @@protoc_insertion_point(builder_scope:pulsar.proto.SingleMessageMetadata) } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 0e55287d02..3f9de1a26d 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -103,7 +103,11 @@ message SingleMessageMetadata { repeated KeyValue properties = 1; optional string partition_key = 2; required int32 payload_size = 3; - optional bool compacted_out = 4 [default = false]; + optional bool compacted_out = 4 [default = false]; + + // the timestamp that this event occurs. it is typically set by applications. + // if this field is omitted, `publish_time` can be used for the purpose of `event_time`. + optional uint64 event_time = 5 [default = 0]; } enum ServerError { @@ -227,7 +231,7 @@ message CommandSubscribe { message CommandPartitionedTopicMetadata { required string topic = 1; required uint64 request_id = 2; - // TODO - Remove original_principal, original_auth_data, original_auth_method + // TODO - Remove original_principal, original_auth_data, original_auth_method // Original principal that was verified by // a Pulsar proxy. optional string original_principal = 3; @@ -255,7 +259,7 @@ message CommandLookupTopic { required uint64 request_id = 2; optional bool authoritative = 3 [default = false]; - // TODO - Remove original_principal, original_auth_data, original_auth_method + // TODO - Remove original_principal, original_auth_data, original_auth_method // Original principal that was verified by // a Pulsar proxy. optional string original_principal = 4; ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org With regards, Apache Git Services