pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhai...@apache.org
Subject [pulsar] branch master updated: add reset cousor to a specific publish time (#3622)
Date Thu, 28 Feb 2019 14:48:42 GMT
This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f376e1  add reset cousor to a specific publish time (#3622)
1f376e1 is described below

commit 1f376e10362df30af35580f072e83d57757013d2
Author: 冉小龙 <rxl5555555@qq.com>
AuthorDate: Thu Feb 28 22:48:37 2019 +0800

    add reset cousor to a specific publish time (#3622)
    
    Signed-off-by: xiaolong.ran ranxiaolong716@gmail.com
    
    Fixes #3446 #3565
    
    Motivation
    Reset the subscription associated with this consumer to a specific publish time.
---
 .../apache/pulsar/broker/service/ServerCnx.java    | 24 +++++++--
 .../apache/pulsar/client/impl/RawReaderImpl.java   |  6 +++
 .../broker/service/SubscriptionSeekTest.java       | 55 +++++++++++++++++++++
 .../org/apache/pulsar/client/api/Consumer.java     | 23 +++++++++
 .../pulsar/client/impl/v1/ConsumerV1Impl.java      |  8 +++
 .../org/apache/pulsar/client/api/Consumer.java     | 23 +++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 39 +++++++++++++++
 .../client/impl/MultiTopicsConsumerImpl.java       | 16 ++++++
 .../org/apache/pulsar/common/api/Commands.java     | 15 ++++++
 .../apache/pulsar/common/api/proto/PulsarApi.java  | 57 ++++++++++++++++++++++
 pulsar-common/src/main/proto/PulsarApi.proto       |  1 +
 .../pulsar/PulsarConsumerSourceTests.java          | 10 ++++
 12 files changed, 273 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index e6819bf..69db016 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1050,14 +1050,15 @@ public class ServerCnx extends PulsarHandler {
         final long requestId = seek.getRequestId();
         CompletableFuture<Consumer> consumerFuture = consumers.get(seek.getConsumerId());
 
-        // Currently only seeking on a message id is supported
-        if (!seek.hasMessageId()) {
+        if (!seek.hasMessageId() && !seek.hasMessagePublishTime()) {
             ctx.writeAndFlush(
-                    Commands.newError(requestId, ServerError.MetadataError, "Message id was
not present"));
+                    Commands.newError(requestId, ServerError.MetadataError, "Message id and
message publish time were not present"));
             return;
         }
 
-        if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally())
{
+        boolean consumerCreated = consumerFuture != null && consumerFuture.isDone()
&& !consumerFuture.isCompletedExceptionally();
+
+        if (consumerCreated && seek.hasMessageId()) {
             Consumer consumer = consumerFuture.getNow(null);
             Subscription subscription = consumer.getSubscription();
             MessageIdData msgIdData = seek.getMessageId();
@@ -1075,6 +1076,21 @@ public class ServerCnx extends PulsarHandler {
                         "Error when resetting subscription: " + ex.getCause().getMessage()));
                 return null;
             });
+        } else if (consumerCreated && seek.hasMessagePublishTime()){
+            Consumer consumer = consumerFuture.getNow(null);
+            Subscription subscription = consumer.getSubscription();
+            long timestamp = seek.getMessagePublishTime();
+
+            subscription.resetCursor(timestamp).thenRun(() -> {
+                log.info("[{}] [{}][{}] Reset subscription to publish time {}", remoteAddress,
+                        subscription.getTopic().getName(), subscription.getName(), timestamp);
+                ctx.writeAndFlush(Commands.newSuccess(requestId));
+            }).exceptionally(ex -> {
+                log.warn("[{}][{}] Failed to reset subscription: {}", remoteAddress, subscription,
ex.getMessage(), ex);
+                ctx.writeAndFlush(Commands.newError(requestId, ServerError.UnknownError,
+                        "Reset subscription to publish time error: " + ex.getCause().getMessage()));
+                return null;
+            });
         } else {
             ctx.writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, "Consumer
not found"));
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index bff1043..fa527c1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -166,6 +166,12 @@ public class RawReaderImpl implements RawReader {
         }
 
         @Override
+        public CompletableFuture<Void> seekAsync(long timestamp) {
+            reset();
+            return super.seekAsync(timestamp);
+        }
+
+        @Override
         public CompletableFuture<Void> seekAsync(MessageId messageId) {
             reset();
             return super.seekAsync(messageId);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
index 8cc8733..99be7f4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
@@ -24,12 +24,14 @@ import static org.testng.Assert.fail;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.util.RelativeTimeUtil;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -104,4 +106,57 @@ public class SubscriptionSeekTest extends BrokerTestBase {
         }
     }
 
+    @Test
+    public void testSeekTime() throws Exception {
+        final String topicName = "persistent://prop/use/ns-abc/testSeekTime";
+        String resetTimeStr = "100s";
+        long resetTimeInMillis = TimeUnit.SECONDS
+                .toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds(resetTimeStr));
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+
+        // Disable pre-fetch in consumer to track the messages received
+        org.apache.pulsar.client.api.Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
+                .subscriptionName("my-subscription").receiverQueueSize(0).subscribe();
+
+        PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
+        assertNotNull(topicRef);
+        assertEquals(topicRef.getProducers().size(), 1);
+        assertEquals(topicRef.getSubscriptions().size(), 1);
+        PersistentSubscription sub = topicRef.getSubscription("my-subscription");
+
+        for (int i = 0; i < 10; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        assertEquals(sub.getNumberOfEntriesInBacklog(), 10);
+
+        long currentTimestamp = System.currentTimeMillis();
+        consumer.seek(currentTimestamp);
+        assertEquals(sub.getNumberOfEntriesInBacklog(), 1);
+
+        // Wait for consumer to reconnect
+        Thread.sleep(1000);
+        consumer.seek(currentTimestamp - resetTimeInMillis);
+        assertEquals(sub.getNumberOfEntriesInBacklog(), 10);
+    }
+
+    @Test
+    public void testSeekTimeOnPartitionedTopic() throws Exception {
+        final String topicName = "persistent://prop/use/ns-abc/testSeekTimePartitions";
+        long timestamp = 1550479732;
+
+        admin.topics().createPartitionedTopic(topicName, 2);
+        org.apache.pulsar.client.api.Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
+                .subscriptionName("my-subscription").subscribe();
+
+        try {
+            consumer.seek(timestamp);
+            fail("Should not have succeeded");
+        } catch (PulsarClientException e) {
+            // Expected
+        }
+    }
+
 }
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Consumer.java
b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Consumer.java
index 00da0a4..9f29fd0 100644
--- a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Consumer.java
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Consumer.java
@@ -265,6 +265,17 @@ public interface Consumer extends Closeable {
     void seek(MessageId messageId) throws PulsarClientException;
 
     /**
+     * Reset the subscription associated with this consumer to a specific message publish
time.
+     *
+     * Note: this operation can only be done on non-partitioned topics. For these, one can
rather perform the seek() on
+     * the individual partitions.
+     *
+     * @param timestamp
+     *            the message publish time where to reposition the subscription
+     */
+    void seek(long timestamp) throws PulsarClientException;
+
+    /**
      * Reset the subscription associated with this consumer to a specific message id.
      * <p>
      *
@@ -285,6 +296,18 @@ public interface Consumer extends Closeable {
     CompletableFuture<Void> seekAsync(MessageId messageId);
 
     /**
+     * Reset the subscription associated with this consumer to a specific message publish
time.
+     *
+     * Note: this operation can only be done on non-partitioned topics. For these, one can
rather perform the seek() on
+     * the individual partitions.
+     *
+     * @param timestamp
+     *            the message publish time where to reposition the subscription
+     * @return a future to track the completion of the seek operation
+     */
+    CompletableFuture<Void> seekAsync(long timestamp);
+
+    /**
      * @return Whether the consumer is connected to the broker
      */
     boolean isConnected();
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ConsumerV1Impl.java
b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ConsumerV1Impl.java
index e21e572..0896cb5 100644
--- a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ConsumerV1Impl.java
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ConsumerV1Impl.java
@@ -138,6 +138,14 @@ public class ConsumerV1Impl implements Consumer {
         consumer.seek(arg0);
     }
 
+    public void seek(long arg0) throws PulsarClientException {
+        consumer.seek(arg0);
+    }
+
+    public CompletableFuture<Void> seekAsync(long arg0) {
+        return consumer.seekAsync(arg0);
+    }
+
     public CompletableFuture<Void> seekAsync(MessageId arg0) {
         return consumer.seekAsync(arg0);
     }
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
index 3f5b9ae..76c9911 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
@@ -274,6 +274,17 @@ public interface Consumer<T> extends Closeable {
     void seek(MessageId messageId) throws PulsarClientException;
 
     /**
+     * Reset the subscription associated with this consumer to a specific message publish
time.
+     *
+     * Note: this operation can only be done on non-partitioned topics. For these, one can
rather perform the seek() on
+     * the individual partitions.
+     *
+     * @param timestamp
+     *            the message publish time where to reposition the subscription
+     */
+    void seek(long timestamp) throws PulsarClientException;
+
+    /**
      * Reset the subscription associated with this consumer to a specific message id.
      * <p>
      *
@@ -294,6 +305,18 @@ public interface Consumer<T> extends Closeable {
     CompletableFuture<Void> seekAsync(MessageId messageId);
 
     /**
+     * Reset the subscription associated with this consumer to a specific message publish
time.
+     *
+     * Note: this operation can only be done on non-partitioned topics. For these, one can
rather perform the seek() on
+     * the individual partitions.
+     *
+     * @param timestamp
+     *            the message publish time where to reposition the subscription
+     * @return a future to track the completion of the seek operation
+     */
+    CompletableFuture<Void> seekAsync(long timestamp);
+
+    /**
      * @return Whether the consumer is connected to the broker
      */
     boolean isConnected();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 7fb4531..4145cb9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1307,6 +1307,45 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements
ConnectionHandle
     }
 
     @Override
+    public void seek(long timestamp) throws PulsarClientException {
+        try {
+            seekAsync(timestamp).get();
+        } catch (ExecutionException | InterruptedException e) {
+            throw new PulsarClientException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> seekAsync(long timestamp) {
+        if (getState() == State.Closing || getState() == State.Closed) {
+            return FutureUtil
+                    .failedFuture(new PulsarClientException.AlreadyClosedException("Consumer
was already closed"));
+        }
+
+        if (!isConnected()) {
+            return FutureUtil.failedFuture(new PulsarClientException("Not connected to broker"));
+        }
+
+        final CompletableFuture<Void> seekFuture = new CompletableFuture<>();
+
+        long requestId = client.newRequestId();
+        ByteBuf seek = Commands.newSeek(consumerId, requestId, timestamp);
+        ClientCnx cnx = cnx();
+
+        log.info("[{}][{}] Seek subscription to publish time {}", topic, subscription, timestamp);
+
+        cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
+            log.info("[{}][{}] Successfully reset subscription to publish time {}", topic,
subscription, timestamp);
+            seekFuture.complete(null);
+        }).exceptionally(e -> {
+            log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());
+            seekFuture.completeExceptionally(e.getCause());
+            return null;
+        });
+        return seekFuture;
+    }
+
+    @Override
     public CompletableFuture<Void> seekAsync(MessageId messageId) {
         if (getState() == State.Closing || getState() == State.Closed) {
             return FutureUtil
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index ddd44d2..80532e1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -555,11 +555,27 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T>
{
     }
 
     @Override
+    public void seek(long timestamp) throws PulsarClientException {
+        try {
+            seekAsync(timestamp).get();
+        } catch (ExecutionException e) {
+            throw new PulsarClientException(e.getCause());
+        } catch (InterruptedException e) {
+            throw new PulsarClientException(e);
+        }
+    }
+
+    @Override
     public CompletableFuture<Void> seekAsync(MessageId messageId) {
         return FutureUtil.failedFuture(new PulsarClientException("Seek operation not supported
on topics consumer"));
     }
 
     @Override
+    public CompletableFuture<Void> seekAsync(long timestamp) {
+        return FutureUtil.failedFuture(new PulsarClientException("Seek operation not supported
on topics consumer"));
+    }
+
+    @Override
     public int getAvailablePermits() {
         return consumers.values().stream().mapToInt(ConsumerImpl::getAvailablePermits).sum();
     }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index 2adb274..b047b56 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -404,6 +404,21 @@ public class Commands {
         return res;
     }
 
+    public static ByteBuf newSeek(long consumerId, long requestId, long timestamp) {
+        CommandSeek.Builder seekBuilder = CommandSeek.newBuilder();
+        seekBuilder.setConsumerId(consumerId);
+        seekBuilder.setRequestId(requestId);
+
+        seekBuilder.setMessagePublishTime(timestamp);
+
+        CommandSeek seek = seekBuilder.build();
+        ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.SEEK).setSeek(seek));
+
+        seekBuilder.recycle();
+        seek.recycle();
+        return res;
+    }
+
     public static ByteBuf newCloseConsumer(long consumerId, long requestId) {
         CommandCloseConsumer.Builder closeConsumerBuilder = CommandCloseConsumer.newBuilder();
         closeConsumerBuilder.setConsumerId(consumerId);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 66180fb..2940a99 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -16195,6 +16195,10 @@ public final class PulsarApi {
     // optional .pulsar.proto.MessageIdData message_id = 3;
     boolean hasMessageId();
     org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData getMessageId();
+    
+    // optional uint64 message_publish_time = 4;
+    boolean hasMessagePublishTime();
+    long getMessagePublishTime();
   }
   public static final class CommandSeek extends
       org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -16261,10 +16265,21 @@ public final class PulsarApi {
       return messageId_;
     }
     
+    // optional uint64 message_publish_time = 4;
+    public static final int MESSAGE_PUBLISH_TIME_FIELD_NUMBER = 4;
+    private long messagePublishTime_;
+    public boolean hasMessagePublishTime() {
+      return ((bitField0_ & 0x00000008) == 0x00000008);
+    }
+    public long getMessagePublishTime() {
+      return messagePublishTime_;
+    }
+    
     private void initFields() {
       consumerId_ = 0L;
       requestId_ = 0L;
       messageId_ = org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance();
+      messagePublishTime_ = 0L;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -16306,6 +16321,9 @@ public final class PulsarApi {
       if (((bitField0_ & 0x00000004) == 0x00000004)) {
         output.writeMessage(3, messageId_);
       }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        output.writeUInt64(4, messagePublishTime_);
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -16326,6 +16344,10 @@ public final class PulsarApi {
         size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
           .computeMessageSize(3, messageId_);
       }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeUInt64Size(4, messagePublishTime_);
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -16445,6 +16467,8 @@ public final class PulsarApi {
         bitField0_ = (bitField0_ & ~0x00000002);
         messageId_ = org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance();
         bitField0_ = (bitField0_ & ~0x00000004);
+        messagePublishTime_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000008);
         return this;
       }
       
@@ -16490,6 +16514,10 @@ public final class PulsarApi {
           to_bitField0_ |= 0x00000004;
         }
         result.messageId_ = messageId_;
+        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+          to_bitField0_ |= 0x00000008;
+        }
+        result.messagePublishTime_ = messagePublishTime_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -16505,6 +16533,9 @@ public final class PulsarApi {
         if (other.hasMessageId()) {
           mergeMessageId(other.getMessageId());
         }
+        if (other.hasMessagePublishTime()) {
+          setMessagePublishTime(other.getMessagePublishTime());
+        }
         return this;
       }
       
@@ -16568,6 +16599,11 @@ public final class PulsarApi {
               subBuilder.recycle();
               break;
             }
+            case 32: {
+              bitField0_ |= 0x00000008;
+              messagePublishTime_ = input.readUInt64();
+              break;
+            }
           }
         }
       }
@@ -16659,6 +16695,27 @@ public final class PulsarApi {
         return this;
       }
       
+      // optional uint64 message_publish_time = 4;
+      private long messagePublishTime_ ;
+      public boolean hasMessagePublishTime() {
+        return ((bitField0_ & 0x00000008) == 0x00000008);
+      }
+      public long getMessagePublishTime() {
+        return messagePublishTime_;
+      }
+      public Builder setMessagePublishTime(long value) {
+        bitField0_ |= 0x00000008;
+        messagePublishTime_ = value;
+        
+        return this;
+      }
+      public Builder clearMessagePublishTime() {
+        bitField0_ = (bitField0_ & ~0x00000008);
+        messagePublishTime_ = 0L;
+        
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandSeek)
     }
     
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index 761d8a1..3f10d49 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -397,6 +397,7 @@ message CommandSeek {
 	required uint64 request_id  = 2;
 
 	optional MessageIdData message_id = 3;
+	optional uint64 message_publish_time = 4;
 }
 
 // Message sent by broker to client when a topic
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java
b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java
index a6bcc3a..89b842f 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java
@@ -490,11 +490,21 @@ public class PulsarConsumerSourceTests {
         }
 
         @Override
+        public void seek(long timestamp) throws PulsarClientException {
+
+        }
+
+        @Override
         public CompletableFuture<Void> seekAsync(MessageId messageId) {
             return null;
         }
 
         @Override
+        public CompletableFuture<Void> seekAsync(long timestamp) {
+            return null;
+        }
+
+        @Override
         public boolean isConnected() {
             return true;
         }


Mime
View raw message