This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9d1916d Fixed event time metadata on batched messages (#1882)
9d1916d is described below
commit 9d1916d46148fc3b6752b797065e9c646e14b9b3
Author: Matteo Merli <mmerli@apache.org>
AuthorDate: Thu May 31 23:19:12 2018 -0700
Fixed event time metadata on batched messages (#1882)
* Fixed event time metadata on batched messages
* Fixed test
* Fixed cpp test topic name
---
.../broker/service/PersistentTopicE2ETest.java | 17 +++++++
pulsar-client-cpp/lib/Commands.cc | 4 ++
pulsar-client-cpp/lib/Message.cc | 8 +++
pulsar-client-cpp/tests/BasicEndToEndTest.cc | 26 ++++++++++
.../org/apache/pulsar/client/impl/MessageImpl.java | 4 ++
.../org/apache/pulsar/common/api/Commands.java | 10 +++-
.../apache/pulsar/common/api/proto/PulsarApi.java | 57 ++++++++++++++++++++++
pulsar-common/src/main/proto/PulsarApi.proto | 10 ++--
8 files changed, 131 insertions(+), 5 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index 76b9a77..a5beefa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -58,6 +58,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
@@ -1389,4 +1390,20 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
Optional<Topic> t = pulsar.getBrokerService().getTopicReference(topicName);
assertFalse(t.isPresent());
}
+
+ @Test
+ public void testWithEventTime() throws Exception {
+ final String topicName = "prop/ns-abc/topic-event-time";
+ final String subName = "sub";
+
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subName)
+ .subscribe();
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
+
+ producer.newMessage().value("test").eventTime(5).send();
+ Message<String> msg = consumer.receive();
+ assertNotNull(msg);
+ assertEquals(msg.getValue(), "test");
+ assertEquals(msg.getEventTime(), 5);
+ }
}
diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc
index 15ffe41..27297b5 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -416,6 +416,10 @@ void Commands::serializeSingleMessageInBatchWithPayload(const Message&
msg, Shar
metadata.mutable_properties()->AddAllocated(keyValue);
}
+ if (msg.impl_->getEventTimestamp() != 0) {
+ metadata.set_event_time(msg.impl_->getEventTimestamp());
+ }
+
// Format of batch message
// Each Message = [METADATA_SIZE][METADATA] [PAYLOAD]
diff --git a/pulsar-client-cpp/lib/Message.cc b/pulsar-client-cpp/lib/Message.cc
index 6b46fd0..683e718 100644
--- a/pulsar-client-cpp/lib/Message.cc
+++ b/pulsar-client-cpp/lib/Message.cc
@@ -79,6 +79,14 @@ Message::Message(const MessageId& messageID, proto::MessageMetadata&
metadata, S
impl_->metadata = metadata;
impl_->payload = payload;
impl_->metadata.mutable_properties()->CopyFrom(singleMetadata.properties());
+
+ if (singleMetadata.has_partition_key()) {
+ impl_->metadata.set_partition_key(singleMetadata.partition_key());
+ }
+
+ if (singleMetadata.has_event_time()) {
+ impl_->metadata.set_event_time(singleMetadata.event_time());
+ }
}
const MessageId& Message::getMessageId() const {
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index d4f6a0b..f9a6592 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -1303,3 +1303,29 @@ TEST(BasicEndToEndTest, testEncryptionFailure) {
// Since messag is discarded, no message will be received.
ASSERT_EQ(ResultTimeout, consumer.receive(msgReceived, 5000));
}
+
+TEST(BasicEndToEndTest, testEventTime) {
+ ClientConfiguration config;
+ Client client(lookupUrl, config);
+ std::string topicName = "persistent://prop/unit/ns1/topic";
+ Producer producer;
+ ProducerConfiguration producerConf;
+ producerConf.setBatchingEnabled(true);
+ Result result = client.createProducer(topicName, producerConf, producer);
+ ASSERT_EQ(ResultOk, result);
+
+ Consumer consumer;
+ result = client.subscribe(topicName, "sub", consumer);
+ ASSERT_EQ(ResultOk, result);
+
+ producer.send(MessageBuilder().setContent("test").setEventTimestamp(5).build());
+
+ Message msg;
+ result = consumer.receive(msg);
+ ASSERT_EQ(ResultOk, result);
+
+ ASSERT_EQ(msg.getEventTimestamp(), 5);
+
+ consumer.close();
+ producer.close();
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 7001530..c019112 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -121,6 +121,10 @@ public class MessageImpl<T> extends MessageRecordImpl<T, MessageId>
{
msgMetadataBuilder.setPartitionKey(singleMessageMetadata.getPartitionKey());
}
+ if (singleMessageMetadata.hasEventTime()) {
+ msgMetadataBuilder.setEventTime(singleMessageMetadata.getEventTime());
+ }
+
this.schema = schema;
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index dea948c..84d0218 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -18,10 +18,10 @@
*/
package org.apache.pulsar.common.api;
-import static org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.copyFrom;
-import static org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.copyFromUtf8;
import static com.scurrilous.circe.checksum.Crc32cIntChecksum.computeChecksum;
import static com.scurrilous.circe.checksum.Crc32cIntChecksum.resumeChecksum;
+import static org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.copyFrom;
+import static org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.copyFromUtf8;
import com.google.common.annotations.VisibleForTesting;
@@ -980,6 +980,11 @@ public class Commands {
singleMessageMetadataBuilder = singleMessageMetadataBuilder
.addAllProperties(msgBuilder.getPropertiesList());
}
+
+ if (msgBuilder.hasEventTime()) {
+ singleMessageMetadataBuilder.setEventTime(msgBuilder.getEventTime());
+ }
+
try {
return serializeSingleMessageInBatchWithPayload(singleMessageMetadataBuilder,
payload, batchBuffer);
} finally {
@@ -997,6 +1002,7 @@ public class Commands {
ByteBufCodedInputStream stream = ByteBufCodedInputStream.get(uncompressedPayload);
PulsarApi.SingleMessageMetadata singleMessageMetadata = singleMessageMetadataBuilder.mergeFrom(stream,
null)
.build();
+
int singleMessagePayloadSize = singleMessageMetadata.getPayloadSize();
uncompressedPayload.markReaderIndex();
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 573ed50..ce0124b 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -4544,6 +4544,10 @@ public final class PulsarApi {
// optional bool compacted_out = 4 [default = false];
boolean hasCompactedOut();
boolean getCompactedOut();
+
+ // optional uint64 event_time = 5 [default = 0];
+ boolean hasEventTime();
+ long getEventTime();
}
public static final class SingleMessageMetadata extends
org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -4653,11 +4657,22 @@ public final class PulsarApi {
return compactedOut_;
}
+ // optional uint64 event_time = 5 [default = 0];
+ public static final int EVENT_TIME_FIELD_NUMBER = 5;
+ private long eventTime_;
+ public boolean hasEventTime() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ public long getEventTime() {
+ return eventTime_;
+ }
+
private void initFields() {
properties_ = java.util.Collections.emptyList();
partitionKey_ = "";
payloadSize_ = 0;
compactedOut_ = false;
+ eventTime_ = 0L;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -4698,6 +4713,9 @@ public final class PulsarApi {
if (((bitField0_ & 0x00000004) == 0x00000004)) {
output.writeBool(4, compactedOut_);
}
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ output.writeUInt64(5, eventTime_);
+ }
}
private int memoizedSerializedSize = -1;
@@ -4722,6 +4740,10 @@ public final class PulsarApi {
size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
.computeBoolSize(4, compactedOut_);
}
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeUInt64Size(5, eventTime_);
+ }
memoizedSerializedSize = size;
return size;
}
@@ -4843,6 +4865,8 @@ public final class PulsarApi {
bitField0_ = (bitField0_ & ~0x00000004);
compactedOut_ = false;
bitField0_ = (bitField0_ & ~0x00000008);
+ eventTime_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000010);
return this;
}
@@ -4893,6 +4917,10 @@ public final class PulsarApi {
to_bitField0_ |= 0x00000004;
}
result.compactedOut_ = compactedOut_;
+ if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
+ to_bitField0_ |= 0x00000008;
+ }
+ result.eventTime_ = eventTime_;
result.bitField0_ = to_bitField0_;
return result;
}
@@ -4918,6 +4946,9 @@ public final class PulsarApi {
if (other.hasCompactedOut()) {
setCompactedOut(other.getCompactedOut());
}
+ if (other.hasEventTime()) {
+ setEventTime(other.getEventTime());
+ }
return this;
}
@@ -4978,6 +5009,11 @@ public final class PulsarApi {
compactedOut_ = input.readBool();
break;
}
+ case 40: {
+ bitField0_ |= 0x00000010;
+ eventTime_ = input.readUInt64();
+ break;
+ }
}
}
}
@@ -5151,6 +5187,27 @@ public final class PulsarApi {
return this;
}
+ // optional uint64 event_time = 5 [default = 0];
+ private long eventTime_ ;
+ public boolean hasEventTime() {
+ return ((bitField0_ & 0x00000010) == 0x00000010);
+ }
+ public long getEventTime() {
+ return eventTime_;
+ }
+ public Builder setEventTime(long value) {
+ bitField0_ |= 0x00000010;
+ eventTime_ = value;
+
+ return this;
+ }
+ public Builder clearEventTime() {
+ bitField0_ = (bitField0_ & ~0x00000010);
+ eventTime_ = 0L;
+
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:pulsar.proto.SingleMessageMetadata)
}
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index 0e55287..3f9de1a 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -103,7 +103,11 @@ message SingleMessageMetadata {
repeated KeyValue properties = 1;
optional string partition_key = 2;
required int32 payload_size = 3;
- optional bool compacted_out = 4 [default = false];
+ optional bool compacted_out = 4 [default = false];
+
+ // the timestamp that this event occurs. it is typically set by applications.
+ // if this field is omitted, `publish_time` can be used for the purpose of `event_time`.
+ optional uint64 event_time = 5 [default = 0];
}
enum ServerError {
@@ -227,7 +231,7 @@ message CommandSubscribe {
message CommandPartitionedTopicMetadata {
required string topic = 1;
required uint64 request_id = 2;
- // TODO - Remove original_principal, original_auth_data, original_auth_method
+ // TODO - Remove original_principal, original_auth_data, original_auth_method
// Original principal that was verified by
// a Pulsar proxy.
optional string original_principal = 3;
@@ -255,7 +259,7 @@ message CommandLookupTopic {
required uint64 request_id = 2;
optional bool authoritative = 3 [default = false];
- // TODO - Remove original_principal, original_auth_data, original_auth_method
+ // TODO - Remove original_principal, original_auth_data, original_auth_method
// Original principal that was verified by
// a Pulsar proxy.
optional string original_principal = 4;
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.
|