pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [incubator-pulsar] branch master updated: Namespace level policy for offload deletion lag (#2256)
Date Wed, 01 Aug 2018 08:22:12 GMT
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 595ba81  Namespace level policy for offload deletion lag (#2256)
595ba81 is described below

commit 595ba8102ba811e301b917ce8b34b5abe36fcb62
Author: Ivan Kelly <ivank@apache.org>
AuthorDate: Wed Aug 1 10:22:10 2018 +0200

    Namespace level policy for offload deletion lag (#2256)
    
    Add a policy parameter at the namespace level for the offload deletion
    lag, the amount of time to wait after offloading a ledger before we
    delete the ledger from bookkeeper.
    
    This namespace policy overrides the broker configured policy. Via the
    REST api this is exposed at a millisecond granularity while via the
    CLI it is exposed a minute granularity.
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  39 +++++++-
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |  44 +++++++++
 .../pulsar/broker/service/BrokerService.java       |  12 ++-
 .../org/apache/pulsar/client/admin/Namespaces.java |  69 +++++++++++++
 .../client/admin/internal/NamespacesImpl.java      |  35 +++++++
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java |  52 ++++++++++
 .../pulsar/common/policies/data/Policies.java      |   7 +-
 .../tests/integration/offload/TestS3Offload.java   | 109 +++++++++++++++++++++
 8 files changed, 360 insertions(+), 7 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 553f53a..c4202c6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1590,7 +1590,7 @@ public abstract class NamespacesBase extends AdminResource {
             globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
             policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
             log.info("[{}] Successfully updated offloadThreshold configuration: namespace={},
value={}",
-                     clientAppId(), namespaceName, policies.compaction_threshold);
+                     clientAppId(), namespaceName, policies.offload_threshold);
 
         } catch (KeeperException.NoNodeException e) {
             log.warn("[{}] Failed to update offloadThreshold configuration for namespace
{}: does not exist",
@@ -1609,5 +1609,42 @@ public abstract class NamespacesBase extends AdminResource {
         }
     }
 
+    protected Long internalGetOffloadDeletionLag() {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        return getNamespacePolicies(namespaceName).offload_deletion_lag_ms;
+    }
+
+    protected void internalSetOffloadDeletionLag(Long newDeletionLagMs) {
+        validateSuperUserAccess();
+        validatePoliciesReadOnlyAccess();
+
+        try {
+            Stat nodeStat = new Stat();
+            final String path = path(POLICIES, namespaceName.toString());
+            byte[] content = globalZk().getData(path, null, nodeStat);
+            Policies policies = jsonMapper().readValue(content, Policies.class);
+            policies.offload_deletion_lag_ms = newDeletionLagMs;
+            globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
+            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+            log.info("[{}] Successfully updated offloadDeletionLagMs configuration: namespace={},
value={}",
+                     clientAppId(), namespaceName, policies.offload_deletion_lag_ms);
+
+        } catch (KeeperException.NoNodeException e) {
+            log.warn("[{}] Failed to update offloadDeletionLagMs configuration for namespace
{}: does not exist",
+                     clientAppId(), namespaceName);
+            throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
+        } catch (KeeperException.BadVersionException e) {
+            log.warn("[{}] Failed to update offloadDeletionLagMs configuration for namespace
{}: concurrent modification",
+                     clientAppId(), namespaceName);
+            throw new RestException(Status.CONFLICT, "Concurrent modification");
+        } catch (RestException pfe) {
+            throw pfe;
+        } catch (Exception e) {
+            log.error("[{}] Failed to update offloadDeletionLag configuration for namespace
{}",
+                      clientAppId(), namespaceName, e);
+            throw new RestException(e);
+        }
+    }
+
     private static final Logger log = LoggerFactory.getLogger(NamespacesBase.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 0fcd4aa..07e26ae 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -715,5 +715,49 @@ public class Namespaces extends NamespacesBase {
         internalSetOffloadThreshold(newThreshold);
     }
 
+    @GET
+    @Path("/{property}/{namespace}/offloadDeletionLagMs")
+    @ApiOperation(value = "Number of milliseconds to wait before deleting a ledger segment
which has been offloaded"
+                          + " from the Pulsar cluster's local storage (i.e. BookKeeper)",
+                  notes = "A negative value denotes that deletion has been completely disabled."
+                          + " 'null' denotes that the topics in the namespace will fall back
to the"
+                          + " broker default for deletion lag.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+                            @ApiResponse(code = 404, message = "Namespace doesn't exist")
})
+    public Long getOffloadDeletionLag(@PathParam("property") String property,
+                                      @PathParam("namespace") String namespace) {
+        validateNamespaceName(property, namespace);
+        return internalGetOffloadDeletionLag();
+    }
+
+    @PUT
+    @Path("/{property}/{namespace}/offloadDeletionLagMs")
+    @ApiOperation(value = "Set number of milliseconds to wait before deleting a ledger segment
which has been offloaded"
+                          + " from the Pulsar cluster's local storage (i.e. BookKeeper)",
+                  notes = "A negative value disables the deletion completely.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+                            @ApiResponse(code = 404, message = "Namespace doesn't exist"),
+                            @ApiResponse(code = 409, message = "Concurrent modification"),
+                            @ApiResponse(code = 412, message = "offloadDeletionLagMs value
is not valid") })
+    public void setOffloadDeletionLag(@PathParam("property") String property,
+                                      @PathParam("namespace") String namespace,
+                                      long newDeletionLagMs) {
+        validateNamespaceName(property, namespace);
+        internalSetOffloadDeletionLag(newDeletionLagMs);
+    }
+
+    @DELETE
+    @Path("/{property}/{namespace}/offloadDeletionLagMs")
+    @ApiOperation(value = "Clear the namespace configured offload deletion lag. The topics
in the namespace"
+                          + " will fallback to using the default configured deletion lag
for the broker")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+                            @ApiResponse(code = 404, message = "Namespace doesn't exist"),
+                            @ApiResponse(code = 409, message = "Concurrent modification")
})
+    public void clearOffloadDeletionLag(@PathParam("property") String property,
+                                        @PathParam("namespace") String namespace) {
+        validateNamespaceName(property, namespace);
+        internalSetOffloadDeletionLag(null);
+    }
+
     private static final Logger log = LoggerFactory.getLogger(Namespaces.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 182f3cc..42893a7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -766,10 +766,14 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
             managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
 
             managedLedgerConfig.setLedgerOffloader(pulsar.getManagedLedgerOffloader());
-            managedLedgerConfig.setOffloadLedgerDeletionLag(serviceConfig.getManagedLedgerOffloadDeletionLagMs(),
-                                                            TimeUnit.MILLISECONDS);
-
-            policies.ifPresent(p -> managedLedgerConfig.setOffloadAutoTriggerSizeThresholdBytes(p.offload_threshold));
+            policies.ifPresent(p -> {
+                    long lag = serviceConfig.getManagedLedgerOffloadDeletionLagMs();
+                    if (p.offload_deletion_lag_ms != null) {
+                        lag = p.offload_deletion_lag_ms;
+                    }
+                    managedLedgerConfig.setOffloadLedgerDeletionLag(lag, TimeUnit.MILLISECONDS);
+                    managedLedgerConfig.setOffloadAutoTriggerSizeThresholdBytes(p.offload_threshold);
+                });
 
             future.complete(managedLedgerConfig);
         }, (exception) -> future.completeExceptionally(exception)));
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index 5ee2f22..8a319c6 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.admin;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
 import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException;
@@ -1212,4 +1213,72 @@ public interface Namespaces {
      *             Unexpected error
      */
     void setOffloadThreshold(String namespace, long compactionThreshold) throws PulsarAdminException;
+
+    /**
+     * Get the offload deletion lag for a namespace, in milliseconds.
+     * The number of milliseconds to wait before deleting a ledger segment which has been
offloaded from
+     * the Pulsar cluster's local storage (i.e. BookKeeper).
+     *
+     * If the offload deletion lag has not been set for the namespace, the method returns
'null'
+     * and the namespace will use the configured default of the pulsar broker.
+     *
+     * A negative value disables deletion of the local ledger completely, though it will
still be deleted
+     * if it exceeds the topics retention policy, along with the offloaded copy.
+     *
+     * <p>
+     * Response example:
+     *
+     * <pre>
+     * <code>3600000</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     * @return the offload deletion lag for the namespace in milliseconds, or null if not
set
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    Long getOffloadDeleteLagMs(String namespace) throws PulsarAdminException;
+
+    /**
+     * Set the offload deletion lag for a namespace.
+     *
+     * The offload deletion lag is the amount of time to wait after offloading a ledger segment
to long term storage,
+     * before deleting its copy stored on the Pulsar cluster's local storage (i.e. BookKeeper).
+     *
+     * A negative value disables deletion of the local ledger completely, though it will
still be deleted
+     * if it exceeds the topics retention policy, along with the offloaded copy.
+     *
+     * @param namespace
+     *            Namespace name
+     * @param lag the duration to wait before deleting the local copy
+     * @param unit the timeunit of the duration
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void setOffloadDeleteLag(String namespace, long lag, TimeUnit unit) throws PulsarAdminException;
+
+    /**
+     * Clear the offload deletion lag for a namespace.
+     *
+     * The namespace will fall back to using the configured default of the pulsar broker.
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void clearOffloadDeleteLag(String namespace) throws PulsarAdminException;
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 4ed0b8a..ba2950e 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -23,6 +23,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import javax.ws.rs.client.Entity;
 import javax.ws.rs.client.WebTarget;
@@ -701,6 +702,40 @@ public class NamespacesImpl extends BaseResource implements Namespaces
{
         }
     }
 
+    @Override
+    public Long getOffloadDeleteLagMs(String namespace) throws PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            WebTarget path = namespacePath(ns, "offloadDeletionLagMs");
+            return request(path).get(Long.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void setOffloadDeleteLag(String namespace, long lag, TimeUnit unit) throws PulsarAdminException
{
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            WebTarget path = namespacePath(ns, "offloadDeletionLagMs");
+            request(path).put(Entity.entity(TimeUnit.MILLISECONDS.convert(lag, unit), MediaType.APPLICATION_JSON),
+                              ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void clearOffloadDeleteLag(String namespace) throws PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            WebTarget path = namespacePath(ns, "offloadDeletionLagMs");
+            request(path).delete(ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
     private WebTarget namespacePath(NamespaceName namespace, String... parts) {
         final WebTarget base = namespace.isV2() ? adminV2Namespaces : adminNamespaces;
         WebTarget namespacePath = base.path(namespace.toString());
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 4f898a5..341b249 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -29,6 +29,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.pulsar.admin.cli.utils.IOUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -832,6 +833,53 @@ public class CmdNamespaces extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Get offloadDeletionLag, in minutes, for a namespace")
+    private class GetOffloadDeletionLag extends CliCommand {
+        @Parameter(description = "tenant/namespace\n", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            Long lag = admin.namespaces().getOffloadDeleteLagMs(namespace);
+            if (lag != null) {
+                System.out.println(TimeUnit.MINUTES.convert(lag, TimeUnit.MILLISECONDS) +
" minute(s)");
+            } else {
+                System.out.println("Unset for namespace. Defaulting to broker setting.");
+            }
+        }
+    }
+
+    @Parameters(commandDescription = "Set offloadDeletionLag for a namespace")
+    private class SetOffloadDeletionLag extends CliCommand {
+        @Parameter(description = "tenant/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--lag", "-l" },
+                   description = "Duration to wait after offloading a ledger segment, before
deleting the copy of that"
+                                  + " segment from cluster local storage. (eg: 10m, 5h, 3d,
2w).",
+                   required = true)
+        private String lag = "-1";
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            admin.namespaces().setOffloadDeleteLag(namespace, validateTimeString(lag), TimeUnit.MINUTES);
+        }
+    }
+
+    @Parameters(commandDescription = "Clear offloadDeletionLag for a namespace")
+    private class ClearOffloadDeletionLag extends CliCommand {
+        @Parameter(description = "tenant/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            admin.namespaces().clearOffloadDeleteLag(namespace);
+        }
+    }
+
     public CmdNamespaces(PulsarAdmin admin) {
         super("namespaces", admin);
         jcommander.addCommand("list", new GetNamespacesPerProperty());
@@ -897,5 +945,9 @@ public class CmdNamespaces extends CmdBase {
         jcommander.addCommand("get-offload-threshold", new GetOffloadThreshold());
         jcommander.addCommand("set-offload-threshold", new SetOffloadThreshold());
 
+        jcommander.addCommand("get-offload-deletion-lag", new GetOffloadDeletionLag());
+        jcommander.addCommand("set-offload-deletion-lag", new SetOffloadDeletionLag());
+        jcommander.addCommand("clear-offload-deletion-lag", new ClearOffloadDeletionLag());
+
     }
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index 35a752b..9c7efeb 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -59,6 +59,7 @@ public class Policies {
 
     public long compaction_threshold = 0;
     public long offload_threshold = -1;
+    public Long offload_deletion_lag_ms = null;
 
     @Override
     public boolean equals(Object obj) {
@@ -80,7 +81,8 @@ public class Policies {
                     && max_consumers_per_topic == other.max_consumers_per_topic
                     && max_consumers_per_subscription == other.max_consumers_per_subscription
                     && compaction_threshold == other.compaction_threshold
-                    && offload_threshold == other.offload_threshold;
+                    && offload_threshold == other.offload_threshold
+                    && offload_deletion_lag_ms == other.offload_deletion_lag_ms;
         }
 
         return false;
@@ -112,6 +114,7 @@ public class Policies {
                 .add("max_consumers_per_topic", max_consumers_per_topic)
                 .add("max_consumers_per_subscription", max_consumers_per_topic)
                 .add("compaction_threshold", compaction_threshold)
-                .add("offload_threshold", offload_threshold).toString();
+                .add("offload_threshold", offload_threshold)
+                .add("offload_deletion_lag_ms", offload_deletion_lag_ms).toString();
     }
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java
index b595dd3..dd1cb7a 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java
@@ -18,12 +18,15 @@
  */
 package org.apache.pulsar.tests.integration.offload;
 
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 
 import com.google.common.collect.ImmutableMap;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.BookKeeperAdmin;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -32,6 +35,7 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.LedgerInfo;
 
 import org.apache.pulsar.tests.integration.containers.BrokerContainer;
 import org.apache.pulsar.tests.integration.containers.S3Container;
@@ -241,4 +245,109 @@ public class TestS3Offload extends PulsarClusterTestBase {
             Assert.assertEquals(admin.namespaces().getOffloadThreshold(namespace), -1L);
         }
     }
+
+    private boolean ledgerOffloaded(List<LedgerInfo> ledgers, long ledgerId) {
+        return ledgers.stream().filter(l -> l.ledgerId == ledgerId)
+            .map(l -> l.offloaded).findFirst().get();
+    }
+
+    private long writeAndWaitForOffload(String serviceUrl, String adminUrl, String topic)
throws Exception {
+        try(PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
+            Producer producer = client.newProducer().topic(topic)
+                .blockIfQueueFull(true).enableBatching(false).create();
+            PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) {
+
+            List<LedgerInfo> ledgers = admin.topics().getInternalStats(topic).ledgers;
+            long currentLedger = ledgers.get(ledgers.size() - 1).ledgerId;
+
+            client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe().close();
+
+            // write enough to topic to make it roll twice
+            for (int i = 0; i < ENTRIES_PER_LEDGER * 2.5; i++) {
+                producer.sendAsync(buildEntry("offload-message" + i));
+            }
+            producer.send(buildEntry("final-offload-message"));
+
+            // wait up to 30 seconds for offload to occur
+            for (int i = 0;
+                 i < 300 && !ledgerOffloaded(admin.topics().getInternalStats(topic).ledgers,
currentLedger);
+                 i++) {
+                Thread.sleep(100);
+            }
+            Assert.assertTrue(ledgerOffloaded(admin.topics().getInternalStats(topic).ledgers,
currentLedger));
+
+            return currentLedger;
+        }
+    }
+
+    public boolean ledgerExistsInBookKeeper(long ledgerId) throws Exception {
+        ClientConfiguration bkConf = new ClientConfiguration();
+        bkConf.setZkServers(pulsarCluster.getZKConnString());
+        try (BookKeeperAdmin bk = new BookKeeperAdmin(bkConf)) {
+            try {
+                bk.openLedger(ledgerId).close();
+                return true;
+            } catch (BKException.BKNoSuchLedgerExistsException e) {
+                return false;
+            }
+        }
+    }
+
+    @Test(dataProvider =  "ServiceAndAdminUrls")
+    public void testPublishOffloadAndConsumeDeletionLag(String serviceUrl, String adminUrl)
throws Exception {
+        final String tenant = "s3-offload-test-deletion-lag-" + randomName(4);
+        final String namespace = tenant + "/ns1";
+        final String topic = "persistent://" + namespace + "/topic1";
+
+        pulsarCluster.runAdminCommandOnAnyBroker("tenants",
+                "create", "--allowed-clusters", pulsarCluster.getClusterName(),
+                "--admin-roles", "offload-admin", tenant);
+
+        pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
+                "create", "--clusters", pulsarCluster.getClusterName(), namespace);
+
+        // set threshold to offload runs immediately after role
+        pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
+                                                 "set-offload-threshold", "--size", "0",
namespace);
+
+        String output = pulsarCluster.runAdminCommandOnAnyBroker(
+                "namespaces", "get-offload-deletion-lag", namespace).getStdout();
+        Assert.assertTrue(output.contains("Unset for namespace"));
+
+        long offloadedLedger = writeAndWaitForOffload(serviceUrl, adminUrl, topic);
+        // give it up to 5 seconds to delete, it shouldn't
+        // so we wait this every time
+        Thread.sleep(5000);
+        Assert.assertTrue(ledgerExistsInBookKeeper(offloadedLedger));
+
+        pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "set-offload-deletion-lag",
namespace,
+                                                 "--lag", "0m");
+        output = pulsarCluster.runAdminCommandOnAnyBroker(
+                "namespaces", "get-offload-deletion-lag", namespace).getStdout();
+        Assert.assertTrue(output.contains("0 minute(s)"));
+
+        offloadedLedger = writeAndWaitForOffload(serviceUrl, adminUrl, topic);
+        // wait up to 10 seconds for ledger to be deleted
+        for (int i = 0; i < 10 && ledgerExistsInBookKeeper(offloadedLedger); i++)
{
+            writeAndWaitForOffload(serviceUrl, adminUrl, topic);
+            Thread.sleep(1000);
+        }
+        Assert.assertFalse(ledgerExistsInBookKeeper(offloadedLedger));
+
+        pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "clear-offload-deletion-lag",
namespace);
+
+        Thread.sleep(5); // wait 5 seconds to allow broker to see update
+
+        output = pulsarCluster.runAdminCommandOnAnyBroker(
+                "namespaces", "get-offload-deletion-lag", namespace).getStdout();
+        Assert.assertTrue(output.contains("Unset for namespace"));
+
+        offloadedLedger = writeAndWaitForOffload(serviceUrl, adminUrl, topic);
+
+        // give it up to 5 seconds to delete, it shouldn't
+        // so we wait this every time
+        Thread.sleep(5000);
+        Assert.assertTrue(ledgerExistsInBookKeeper(offloadedLedger));
+    }
+
 }


Mime
View raw message