pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] merlimat closed pull request #1882: Fixed event time metadata on batched messages
Date Fri, 01 Jun 2018 06:19:14 GMT
merlimat closed pull request #1882: Fixed event time metadata on batched messages
URL: https://github.com/apache/incubator-pulsar/pull/1882
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index 76b9a77057..a5beefaceb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -58,6 +58,7 @@
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.ConsumerImpl;
@@ -1389,4 +1390,20 @@ public void testGetTopicIfExists() throws Exception {
         Optional<Topic> t = pulsar.getBrokerService().getTopicReference(topicName);
         assertFalse(t.isPresent());
     }
+
+    @Test
+    public void testWithEventTime() throws Exception {
+        final String topicName = "prop/ns-abc/topic-event-time";
+        final String subName = "sub";
+
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subName)
+                .subscribe();
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
+
+        producer.newMessage().value("test").eventTime(5).send();
+        Message<String> msg = consumer.receive();
+        assertNotNull(msg);
+        assertEquals(msg.getValue(), "test");
+        assertEquals(msg.getEventTime(), 5);
+    }
 }
diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc
index 15ffe4107c..27297b5193 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -416,6 +416,10 @@ void Commands::serializeSingleMessageInBatchWithPayload(const Message&
msg, Shar
         metadata.mutable_properties()->AddAllocated(keyValue);
     }
 
+    if (msg.impl_->getEventTimestamp() != 0) {
+        metadata.set_event_time(msg.impl_->getEventTimestamp());
+    }
+
     // Format of batch message
     // Each Message = [METADATA_SIZE][METADATA] [PAYLOAD]
 
diff --git a/pulsar-client-cpp/lib/Message.cc b/pulsar-client-cpp/lib/Message.cc
index 6b46fd0773..683e718ac2 100644
--- a/pulsar-client-cpp/lib/Message.cc
+++ b/pulsar-client-cpp/lib/Message.cc
@@ -79,6 +79,14 @@ Message::Message(const MessageId& messageID, proto::MessageMetadata&
metadata, S
     impl_->metadata = metadata;
     impl_->payload = payload;
     impl_->metadata.mutable_properties()->CopyFrom(singleMetadata.properties());
+
+    if (singleMetadata.has_partition_key()) {
+        impl_->metadata.set_partition_key(singleMetadata.partition_key());
+    }
+
+    if (singleMetadata.has_event_time()) {
+        impl_->metadata.set_event_time(singleMetadata.event_time());
+    }
 }
 
 const MessageId& Message::getMessageId() const {
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index d4f6a0b058..f9a6592569 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -1303,3 +1303,29 @@ TEST(BasicEndToEndTest, testEncryptionFailure) {
     // Since messag is discarded, no message will be received.
     ASSERT_EQ(ResultTimeout, consumer.receive(msgReceived, 5000));
 }
+
+TEST(BasicEndToEndTest, testEventTime) {
+    ClientConfiguration config;
+    Client client(lookupUrl, config);
+    std::string topicName = "persistent://prop/unit/ns1/topic";
+    Producer producer;
+    ProducerConfiguration producerConf;
+    producerConf.setBatchingEnabled(true);
+    Result result = client.createProducer(topicName, producerConf, producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    result = client.subscribe(topicName, "sub", consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    producer.send(MessageBuilder().setContent("test").setEventTimestamp(5).build());
+
+    Message msg;
+    result = consumer.receive(msg);
+    ASSERT_EQ(ResultOk, result);
+
+    ASSERT_EQ(msg.getEventTimestamp(), 5);
+
+    consumer.close();
+    producer.close();
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 7001530b59..c019112c33 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -121,6 +121,10 @@
             msgMetadataBuilder.setPartitionKey(singleMessageMetadata.getPartitionKey());
         }
 
+        if (singleMessageMetadata.hasEventTime()) {
+            msgMetadataBuilder.setEventTime(singleMessageMetadata.getEventTime());
+        }
+
         this.schema = schema;
     }
 
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index dea948ce00..84d021881b 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -18,10 +18,10 @@
  */
 package org.apache.pulsar.common.api;
 
-import static org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.copyFrom;
-import static org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.copyFromUtf8;
 import static com.scurrilous.circe.checksum.Crc32cIntChecksum.computeChecksum;
 import static com.scurrilous.circe.checksum.Crc32cIntChecksum.resumeChecksum;
+import static org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.copyFrom;
+import static org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.copyFromUtf8;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -980,6 +980,11 @@ public static ByteBuf serializeSingleMessageInBatchWithPayload(PulsarApi.Message
             singleMessageMetadataBuilder = singleMessageMetadataBuilder
                     .addAllProperties(msgBuilder.getPropertiesList());
         }
+
+        if (msgBuilder.hasEventTime()) {
+            singleMessageMetadataBuilder.setEventTime(msgBuilder.getEventTime());
+        }
+
         try {
             return serializeSingleMessageInBatchWithPayload(singleMessageMetadataBuilder,
payload, batchBuffer);
         } finally {
@@ -997,6 +1002,7 @@ public static ByteBuf deSerializeSingleMessageInBatch(ByteBuf uncompressedPayloa
         ByteBufCodedInputStream stream = ByteBufCodedInputStream.get(uncompressedPayload);
         PulsarApi.SingleMessageMetadata singleMessageMetadata = singleMessageMetadataBuilder.mergeFrom(stream,
null)
                 .build();
+
         int singleMessagePayloadSize = singleMessageMetadata.getPayloadSize();
 
         uncompressedPayload.markReaderIndex();
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 573ed5080b..ce0124b784 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -4544,6 +4544,10 @@ public Builder clearSchemaVersion() {
     // optional bool compacted_out = 4 [default = false];
     boolean hasCompactedOut();
     boolean getCompactedOut();
+    
+    // optional uint64 event_time = 5 [default = 0];
+    boolean hasEventTime();
+    long getEventTime();
   }
   public static final class SingleMessageMetadata extends
       org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -4653,11 +4657,22 @@ public boolean getCompactedOut() {
       return compactedOut_;
     }
     
+    // optional uint64 event_time = 5 [default = 0];
+    public static final int EVENT_TIME_FIELD_NUMBER = 5;
+    private long eventTime_;
+    public boolean hasEventTime() {
+      return ((bitField0_ & 0x00000008) == 0x00000008);
+    }
+    public long getEventTime() {
+      return eventTime_;
+    }
+    
     private void initFields() {
       properties_ = java.util.Collections.emptyList();
       partitionKey_ = "";
       payloadSize_ = 0;
       compactedOut_ = false;
+      eventTime_ = 0L;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -4698,6 +4713,9 @@ public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr
       if (((bitField0_ & 0x00000004) == 0x00000004)) {
         output.writeBool(4, compactedOut_);
       }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        output.writeUInt64(5, eventTime_);
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -4722,6 +4740,10 @@ public int getSerializedSize() {
         size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
           .computeBoolSize(4, compactedOut_);
       }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeUInt64Size(5, eventTime_);
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -4843,6 +4865,8 @@ public Builder clear() {
         bitField0_ = (bitField0_ & ~0x00000004);
         compactedOut_ = false;
         bitField0_ = (bitField0_ & ~0x00000008);
+        eventTime_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000010);
         return this;
       }
       
@@ -4893,6 +4917,10 @@ public Builder clone() {
           to_bitField0_ |= 0x00000004;
         }
         result.compactedOut_ = compactedOut_;
+        if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
+          to_bitField0_ |= 0x00000008;
+        }
+        result.eventTime_ = eventTime_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -4918,6 +4946,9 @@ public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.SingleMess
         if (other.hasCompactedOut()) {
           setCompactedOut(other.getCompactedOut());
         }
+        if (other.hasEventTime()) {
+          setEventTime(other.getEventTime());
+        }
         return this;
       }
       
@@ -4978,6 +5009,11 @@ public Builder mergeFrom(
               compactedOut_ = input.readBool();
               break;
             }
+            case 40: {
+              bitField0_ |= 0x00000010;
+              eventTime_ = input.readUInt64();
+              break;
+            }
           }
         }
       }
@@ -5151,6 +5187,27 @@ public Builder clearCompactedOut() {
         return this;
       }
       
+      // optional uint64 event_time = 5 [default = 0];
+      private long eventTime_ ;
+      public boolean hasEventTime() {
+        return ((bitField0_ & 0x00000010) == 0x00000010);
+      }
+      public long getEventTime() {
+        return eventTime_;
+      }
+      public Builder setEventTime(long value) {
+        bitField0_ |= 0x00000010;
+        eventTime_ = value;
+        
+        return this;
+      }
+      public Builder clearEventTime() {
+        bitField0_ = (bitField0_ & ~0x00000010);
+        eventTime_ = 0L;
+        
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.SingleMessageMetadata)
     }
     
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index 0e55287d02..3f9de1a26d 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -103,7 +103,11 @@ message SingleMessageMetadata {
 	repeated KeyValue properties    = 1;
 	optional string partition_key   = 2;
 	required int32 payload_size	= 3;
-        optional bool compacted_out     = 4 [default = false];
+	optional bool compacted_out     = 4 [default = false];
+
+	// the timestamp that this event occurs. it is typically set by applications.
+	// if this field is omitted, `publish_time` can be used for the purpose of `event_time`.
+	optional uint64 event_time = 5 [default = 0];
 }
 
 enum ServerError {
@@ -227,7 +231,7 @@ message CommandSubscribe {
 message CommandPartitionedTopicMetadata {
 	required string topic            = 1;
 	required uint64 request_id       = 2;
-	// TODO - Remove original_principal, original_auth_data, original_auth_method 
+	// TODO - Remove original_principal, original_auth_data, original_auth_method
 	// Original principal that was verified by
 	// a Pulsar proxy.
 	optional string original_principal = 3;
@@ -255,7 +259,7 @@ message CommandLookupTopic {
 	required uint64 request_id       = 2;
 	optional bool authoritative      = 3 [default = false];
 
-	// TODO - Remove original_principal, original_auth_data, original_auth_method 
+	// TODO - Remove original_principal, original_auth_data, original_auth_method
 	// Original principal that was verified by
 	// a Pulsar proxy.
 	optional string original_principal = 4;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message