pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mme...@apache.org
Subject [incubator-pulsar] 03/03: Fixed event time metadata on batched messages (#1882)
Date Fri, 01 Jun 2018 06:25:59 GMT
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git

commit 8c1e779b803812cd4bae8c51aa99036ebb4fbf5f
Author: Matteo Merli <mmerli@apache.org>
AuthorDate: Thu May 31 23:19:12 2018 -0700

    Fixed event time metadata on batched messages (#1882)
    
    * Fixed event time metadata on batched messages
    
    * Fixed test
    
    * Fixed cpp test topic name
---
 .../broker/service/PersistentTopicE2ETest.java     | 17 +++++++
 pulsar-client-cpp/lib/Commands.cc                  |  4 ++
 pulsar-client-cpp/lib/Message.cc                   |  8 +++
 pulsar-client-cpp/tests/BasicEndToEndTest.cc       | 26 ++++++++++
 .../org/apache/pulsar/client/impl/MessageImpl.java |  4 ++
 .../org/apache/pulsar/common/api/Commands.java     |  6 +++
 .../apache/pulsar/common/api/proto/PulsarApi.java  | 57 ++++++++++++++++++++++
 pulsar-common/src/main/proto/PulsarApi.proto       |  6 ++-
 8 files changed, 127 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index 76b9a77..a5beefa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -58,6 +58,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.ConsumerImpl;
@@ -1389,4 +1390,20 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
         Optional<Topic> t = pulsar.getBrokerService().getTopicReference(topicName);
         assertFalse(t.isPresent());
     }
+
+    @Test
+    public void testWithEventTime() throws Exception {
+        final String topicName = "prop/ns-abc/topic-event-time";
+        final String subName = "sub";
+
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subName)
+                .subscribe();
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
+
+        producer.newMessage().value("test").eventTime(5).send();
+        Message<String> msg = consumer.receive();
+        assertNotNull(msg);
+        assertEquals(msg.getValue(), "test");
+        assertEquals(msg.getEventTime(), 5);
+    }
 }
diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc
index 15ffe41..27297b5 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -416,6 +416,10 @@ void Commands::serializeSingleMessageInBatchWithPayload(const Message&
msg, Shar
         metadata.mutable_properties()->AddAllocated(keyValue);
     }
 
+    if (msg.impl_->getEventTimestamp() != 0) {
+        metadata.set_event_time(msg.impl_->getEventTimestamp());
+    }
+
     // Format of batch message
     // Each Message = [METADATA_SIZE][METADATA] [PAYLOAD]
 
diff --git a/pulsar-client-cpp/lib/Message.cc b/pulsar-client-cpp/lib/Message.cc
index 6b46fd0..683e718 100644
--- a/pulsar-client-cpp/lib/Message.cc
+++ b/pulsar-client-cpp/lib/Message.cc
@@ -79,6 +79,14 @@ Message::Message(const MessageId& messageID, proto::MessageMetadata&
metadata, S
     impl_->metadata = metadata;
     impl_->payload = payload;
     impl_->metadata.mutable_properties()->CopyFrom(singleMetadata.properties());
+
+    if (singleMetadata.has_partition_key()) {
+        impl_->metadata.set_partition_key(singleMetadata.partition_key());
+    }
+
+    if (singleMetadata.has_event_time()) {
+        impl_->metadata.set_event_time(singleMetadata.event_time());
+    }
 }
 
 const MessageId& Message::getMessageId() const {
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 56c1796..ba49f2d 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -1303,3 +1303,29 @@ TEST(BasicEndToEndTest, testEncryptionFailure) {
     // Since messag is discarded, no message will be received.
     ASSERT_EQ(ResultTimeout, consumer.receive(msgReceived, 5000));
 }
+
+TEST(BasicEndToEndTest, testEventTime) {
+    ClientConfiguration config;
+    Client client(lookupUrl, config);
+    std::string topicName = "persistent://prop/unit/ns1/topic";
+    Producer producer;
+    ProducerConfiguration producerConf;
+    producerConf.setBatchingEnabled(true);
+    Result result = client.createProducer(topicName, producerConf, producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    result = client.subscribe(topicName, "sub", consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    producer.send(MessageBuilder().setContent("test").setEventTimestamp(5).build());
+
+    Message msg;
+    result = consumer.receive(msg);
+    ASSERT_EQ(ResultOk, result);
+
+    ASSERT_EQ(msg.getEventTimestamp(), 5);
+
+    consumer.close();
+    producer.close();
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 7001530..c019112 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -121,6 +121,10 @@ public class MessageImpl<T> extends MessageRecordImpl<T, MessageId>
{
             msgMetadataBuilder.setPartitionKey(singleMessageMetadata.getPartitionKey());
         }
 
+        if (singleMessageMetadata.hasEventTime()) {
+            msgMetadataBuilder.setEventTime(singleMessageMetadata.getEventTime());
+        }
+
         this.schema = schema;
     }
 
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index 1228876..f837f57 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -961,6 +961,11 @@ public class Commands {
             singleMessageMetadataBuilder = singleMessageMetadataBuilder
                     .addAllProperties(msgBuilder.getPropertiesList());
         }
+
+        if (msgBuilder.hasEventTime()) {
+            singleMessageMetadataBuilder.setEventTime(msgBuilder.getEventTime());
+        }
+
         try {
             return serializeSingleMessageInBatchWithPayload(singleMessageMetadataBuilder,
payload, batchBuffer);
         } finally {
@@ -978,6 +983,7 @@ public class Commands {
         ByteBufCodedInputStream stream = ByteBufCodedInputStream.get(uncompressedPayload);
         PulsarApi.SingleMessageMetadata singleMessageMetadata = singleMessageMetadataBuilder.mergeFrom(stream,
null)
                 .build();
+
         int singleMessagePayloadSize = singleMessageMetadata.getPayloadSize();
 
         uncompressedPayload.markReaderIndex();
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index e1253d9..8046e6e 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -4556,6 +4556,10 @@ public final class PulsarApi {
     // optional bool compacted_out = 4 [default = false];
     boolean hasCompactedOut();
     boolean getCompactedOut();
+    
+    // optional uint64 event_time = 5 [default = 0];
+    boolean hasEventTime();
+    long getEventTime();
   }
   public static final class SingleMessageMetadata extends
       com.google.protobuf.GeneratedMessageLite
@@ -4667,11 +4671,22 @@ public final class PulsarApi {
       return compactedOut_;
     }
     
+    // optional uint64 event_time = 5 [default = 0];
+    public static final int EVENT_TIME_FIELD_NUMBER = 5;
+    private long eventTime_;
+    public boolean hasEventTime() {
+      return ((bitField0_ & 0x00000008) == 0x00000008);
+    }
+    public long getEventTime() {
+      return eventTime_;
+    }
+    
     private void initFields() {
       properties_ = java.util.Collections.emptyList();
       partitionKey_ = "";
       payloadSize_ = 0;
       compactedOut_ = false;
+      eventTime_ = 0L;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -4712,6 +4727,9 @@ public final class PulsarApi {
       if (((bitField0_ & 0x00000004) == 0x00000004)) {
         output.writeBool(4, compactedOut_);
       }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        output.writeUInt64(5, eventTime_);
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -4736,6 +4754,10 @@ public final class PulsarApi {
         size += com.google.protobuf.CodedOutputStream
           .computeBoolSize(4, compactedOut_);
       }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeUInt64Size(5, eventTime_);
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -4857,6 +4879,8 @@ public final class PulsarApi {
         bitField0_ = (bitField0_ & ~0x00000004);
         compactedOut_ = false;
         bitField0_ = (bitField0_ & ~0x00000008);
+        eventTime_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000010);
         return this;
       }
       
@@ -4907,6 +4931,10 @@ public final class PulsarApi {
           to_bitField0_ |= 0x00000004;
         }
         result.compactedOut_ = compactedOut_;
+        if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
+          to_bitField0_ |= 0x00000008;
+        }
+        result.eventTime_ = eventTime_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -4932,6 +4960,9 @@ public final class PulsarApi {
         if (other.hasCompactedOut()) {
           setCompactedOut(other.getCompactedOut());
         }
+        if (other.hasEventTime()) {
+          setEventTime(other.getEventTime());
+        }
         return this;
       }
       
@@ -4992,6 +5023,11 @@ public final class PulsarApi {
               compactedOut_ = input.readBool();
               break;
             }
+            case 40: {
+              bitField0_ |= 0x00000010;
+              eventTime_ = input.readUInt64();
+              break;
+            }
           }
         }
       }
@@ -5165,6 +5201,27 @@ public final class PulsarApi {
         return this;
       }
       
+      // optional uint64 event_time = 5 [default = 0];
+      private long eventTime_ ;
+      public boolean hasEventTime() {
+        return ((bitField0_ & 0x00000010) == 0x00000010);
+      }
+      public long getEventTime() {
+        return eventTime_;
+      }
+      public Builder setEventTime(long value) {
+        bitField0_ |= 0x00000010;
+        eventTime_ = value;
+        
+        return this;
+      }
+      public Builder clearEventTime() {
+        bitField0_ = (bitField0_ & ~0x00000010);
+        eventTime_ = 0L;
+        
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.SingleMessageMetadata)
     }
     
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index 7e76521..375cffd 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -103,7 +103,11 @@ message SingleMessageMetadata {
 	repeated KeyValue properties    = 1;
 	optional string partition_key   = 2;
 	required int32 payload_size	= 3;
-        optional bool compacted_out     = 4 [default = false];
+	optional bool compacted_out     = 4 [default = false];
+
+	// the timestamp that this event occurs. it is typically set by applications.
+	// if this field is omitted, `publish_time` can be used for the purpose of `event_time`.
+	optional uint64 event_time = 5 [default = 0];
 }
 
 enum ServerError {

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.

Mime
View raw message