pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] sijie closed pull request #2256: Namespace level policy for offload deletion lag
Date Wed, 01 Aug 2018 08:22:12 GMT
sijie closed pull request #2256: Namespace level policy for offload deletion lag
URL: https://github.com/apache/incubator-pulsar/pull/2256
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 553f53ab90..c4202c68db 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1590,7 +1590,7 @@ protected void internalSetOffloadThreshold(long newThreshold) {
             globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
             policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
             log.info("[{}] Successfully updated offloadThreshold configuration: namespace={},
value={}",
-                     clientAppId(), namespaceName, policies.compaction_threshold);
+                     clientAppId(), namespaceName, policies.offload_threshold);
 
         } catch (KeeperException.NoNodeException e) {
             log.warn("[{}] Failed to update offloadThreshold configuration for namespace
{}: does not exist",
@@ -1609,5 +1609,42 @@ protected void internalSetOffloadThreshold(long newThreshold) {
         }
     }
 
+    protected Long internalGetOffloadDeletionLag() {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        return getNamespacePolicies(namespaceName).offload_deletion_lag_ms;
+    }
+
+    protected void internalSetOffloadDeletionLag(Long newDeletionLagMs) {
+        validateSuperUserAccess();
+        validatePoliciesReadOnlyAccess();
+
+        try {
+            Stat nodeStat = new Stat();
+            final String path = path(POLICIES, namespaceName.toString());
+            byte[] content = globalZk().getData(path, null, nodeStat);
+            Policies policies = jsonMapper().readValue(content, Policies.class);
+            policies.offload_deletion_lag_ms = newDeletionLagMs;
+            globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
+            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+            log.info("[{}] Successfully updated offloadDeletionLagMs configuration: namespace={},
value={}",
+                     clientAppId(), namespaceName, policies.offload_deletion_lag_ms);
+
+        } catch (KeeperException.NoNodeException e) {
+            log.warn("[{}] Failed to update offloadDeletionLagMs configuration for namespace
{}: does not exist",
+                     clientAppId(), namespaceName);
+            throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
+        } catch (KeeperException.BadVersionException e) {
+            log.warn("[{}] Failed to update offloadDeletionLagMs configuration for namespace
{}: concurrent modification",
+                     clientAppId(), namespaceName);
+            throw new RestException(Status.CONFLICT, "Concurrent modification");
+        } catch (RestException pfe) {
+            throw pfe;
+        } catch (Exception e) {
+            log.error("[{}] Failed to update offloadDeletionLag configuration for namespace
{}",
+                      clientAppId(), namespaceName, e);
+            throw new RestException(e);
+        }
+    }
+
     private static final Logger log = LoggerFactory.getLogger(NamespacesBase.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 0fcd4aa6f9..07e26ae0da 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -715,5 +715,49 @@ public void setOffloadThreshold(@PathParam("property") String property,
         internalSetOffloadThreshold(newThreshold);
     }
 
+    @GET
+    @Path("/{property}/{namespace}/offloadDeletionLagMs")
+    @ApiOperation(value = "Number of milliseconds to wait before deleting a ledger segment
which has been offloaded"
+                          + " from the Pulsar cluster's local storage (i.e. BookKeeper)",
+                  notes = "A negative value denotes that deletion has been completely disabled."
+                          + " 'null' denotes that the topics in the namespace will fall back
to the"
+                          + " broker default for deletion lag.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+                            @ApiResponse(code = 404, message = "Namespace doesn't exist")
})
+    public Long getOffloadDeletionLag(@PathParam("property") String property,
+                                      @PathParam("namespace") String namespace) {
+        validateNamespaceName(property, namespace);
+        return internalGetOffloadDeletionLag();
+    }
+
+    @PUT
+    @Path("/{property}/{namespace}/offloadDeletionLagMs")
+    @ApiOperation(value = "Set number of milliseconds to wait before deleting a ledger segment
which has been offloaded"
+                          + " from the Pulsar cluster's local storage (i.e. BookKeeper)",
+                  notes = "A negative value disables the deletion completely.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+                            @ApiResponse(code = 404, message = "Namespace doesn't exist"),
+                            @ApiResponse(code = 409, message = "Concurrent modification"),
+                            @ApiResponse(code = 412, message = "offloadDeletionLagMs value
is not valid") })
+    public void setOffloadDeletionLag(@PathParam("property") String property,
+                                      @PathParam("namespace") String namespace,
+                                      long newDeletionLagMs) {
+        validateNamespaceName(property, namespace);
+        internalSetOffloadDeletionLag(newDeletionLagMs);
+    }
+
+    @DELETE
+    @Path("/{property}/{namespace}/offloadDeletionLagMs")
+    @ApiOperation(value = "Clear the namespace configured offload deletion lag. The topics
in the namespace"
+                          + " will fallback to using the default configured deletion lag
for the broker")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+                            @ApiResponse(code = 404, message = "Namespace doesn't exist"),
+                            @ApiResponse(code = 409, message = "Concurrent modification")
})
+    public void clearOffloadDeletionLag(@PathParam("property") String property,
+                                        @PathParam("namespace") String namespace) {
+        validateNamespaceName(property, namespace);
+        internalSetOffloadDeletionLag(null);
+    }
+
     private static final Logger log = LoggerFactory.getLogger(Namespaces.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 182f3cc152..42893a7676 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -766,10 +766,14 @@ public void openLedgerFailed(ManagedLedgerException exception, Object
ctx) {
             managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
 
             managedLedgerConfig.setLedgerOffloader(pulsar.getManagedLedgerOffloader());
-            managedLedgerConfig.setOffloadLedgerDeletionLag(serviceConfig.getManagedLedgerOffloadDeletionLagMs(),
-                                                            TimeUnit.MILLISECONDS);
-
-            policies.ifPresent(p -> managedLedgerConfig.setOffloadAutoTriggerSizeThresholdBytes(p.offload_threshold));
+            policies.ifPresent(p -> {
+                    long lag = serviceConfig.getManagedLedgerOffloadDeletionLagMs();
+                    if (p.offload_deletion_lag_ms != null) {
+                        lag = p.offload_deletion_lag_ms;
+                    }
+                    managedLedgerConfig.setOffloadLedgerDeletionLag(lag, TimeUnit.MILLISECONDS);
+                    managedLedgerConfig.setOffloadAutoTriggerSizeThresholdBytes(p.offload_threshold);
+                });
 
             future.complete(managedLedgerConfig);
         }, (exception) -> future.completeExceptionally(exception)));
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index 5ee2f2227e..8a319c6969 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -21,6 +21,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
 import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException;
@@ -1212,4 +1213,72 @@ void clearNamespaceBundleBacklogForSubscription(String namespace, String
bundle,
      *             Unexpected error
      */
     void setOffloadThreshold(String namespace, long compactionThreshold) throws PulsarAdminException;
+
+    /**
+     * Get the offload deletion lag for a namespace, in milliseconds.
+     * The number of milliseconds to wait before deleting a ledger segment which has been
offloaded from
+     * the Pulsar cluster's local storage (i.e. BookKeeper).
+     *
+     * If the offload deletion lag has not been set for the namespace, the method returns
'null'
+     * and the namespace will use the configured default of the pulsar broker.
+     *
+     * A negative value disables deletion of the local ledger completely, though it will
still be deleted
+     * if it exceeds the topics retention policy, along with the offloaded copy.
+     *
+     * <p>
+     * Response example:
+     *
+     * <pre>
+     * <code>3600000</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     * @return the offload deletion lag for the namespace in milliseconds, or null if not
set
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    Long getOffloadDeleteLagMs(String namespace) throws PulsarAdminException;
+
+    /**
+     * Set the offload deletion lag for a namespace.
+     *
+     * The offload deletion lag is the amount of time to wait after offloading a ledger segment
to long term storage,
+     * before deleting its copy stored on the Pulsar cluster's local storage (i.e. BookKeeper).
+     *
+     * A negative value disables deletion of the local ledger completely, though it will
still be deleted
+     * if it exceeds the topics retention policy, along with the offloaded copy.
+     *
+     * @param namespace
+     *            Namespace name
+     * @param lag the duration to wait before deleting the local copy
+     * @param unit the timeunit of the duration
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void setOffloadDeleteLag(String namespace, long lag, TimeUnit unit) throws PulsarAdminException;
+
+    /**
+     * Clear the offload deletion lag for a namespace.
+     *
+     * The namespace will fall back to using the configured default of the pulsar broker.
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void clearOffloadDeleteLag(String namespace) throws PulsarAdminException;
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 4ed0b8acf9..ba2950efe5 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -23,6 +23,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import javax.ws.rs.client.Entity;
 import javax.ws.rs.client.WebTarget;
@@ -701,6 +702,40 @@ public void setOffloadThreshold(String namespace, long offloadThreshold)
throws
         }
     }
 
+    @Override
+    public Long getOffloadDeleteLagMs(String namespace) throws PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            WebTarget path = namespacePath(ns, "offloadDeletionLagMs");
+            return request(path).get(Long.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void setOffloadDeleteLag(String namespace, long lag, TimeUnit unit) throws PulsarAdminException
{
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            WebTarget path = namespacePath(ns, "offloadDeletionLagMs");
+            request(path).put(Entity.entity(TimeUnit.MILLISECONDS.convert(lag, unit), MediaType.APPLICATION_JSON),
+                              ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void clearOffloadDeleteLag(String namespace) throws PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            WebTarget path = namespacePath(ns, "offloadDeletionLagMs");
+            request(path).delete(ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
     private WebTarget namespacePath(NamespaceName namespace, String... parts) {
         final WebTarget base = namespace.isV2() ? adminV2Namespaces : adminNamespaces;
         WebTarget namespacePath = base.path(namespace.toString());
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 4f898a576d..341b24995f 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -29,6 +29,7 @@
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.pulsar.admin.cli.utils.IOUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -832,6 +833,53 @@ void run() throws PulsarAdminException {
         }
     }
 
+    @Parameters(commandDescription = "Get offloadDeletionLag, in minutes, for a namespace")
+    private class GetOffloadDeletionLag extends CliCommand {
+        @Parameter(description = "tenant/namespace\n", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            Long lag = admin.namespaces().getOffloadDeleteLagMs(namespace);
+            if (lag != null) {
+                System.out.println(TimeUnit.MINUTES.convert(lag, TimeUnit.MILLISECONDS) +
" minute(s)");
+            } else {
+                System.out.println("Unset for namespace. Defaulting to broker setting.");
+            }
+        }
+    }
+
+    @Parameters(commandDescription = "Set offloadDeletionLag for a namespace")
+    private class SetOffloadDeletionLag extends CliCommand {
+        @Parameter(description = "tenant/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--lag", "-l" },
+                   description = "Duration to wait after offloading a ledger segment, before
deleting the copy of that"
+                                  + " segment from cluster local storage. (eg: 10m, 5h, 3d,
2w).",
+                   required = true)
+        private String lag = "-1";
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            admin.namespaces().setOffloadDeleteLag(namespace, validateTimeString(lag), TimeUnit.MINUTES);
+        }
+    }
+
+    @Parameters(commandDescription = "Clear offloadDeletionLag for a namespace")
+    private class ClearOffloadDeletionLag extends CliCommand {
+        @Parameter(description = "tenant/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            admin.namespaces().clearOffloadDeleteLag(namespace);
+        }
+    }
+
     public CmdNamespaces(PulsarAdmin admin) {
         super("namespaces", admin);
         jcommander.addCommand("list", new GetNamespacesPerProperty());
@@ -897,5 +945,9 @@ public CmdNamespaces(PulsarAdmin admin) {
         jcommander.addCommand("get-offload-threshold", new GetOffloadThreshold());
         jcommander.addCommand("set-offload-threshold", new SetOffloadThreshold());
 
+        jcommander.addCommand("get-offload-deletion-lag", new GetOffloadDeletionLag());
+        jcommander.addCommand("set-offload-deletion-lag", new SetOffloadDeletionLag());
+        jcommander.addCommand("clear-offload-deletion-lag", new ClearOffloadDeletionLag());
+
     }
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index 35a752b78f..9c7efeb6a9 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -59,6 +59,7 @@
 
     public long compaction_threshold = 0;
     public long offload_threshold = -1;
+    public Long offload_deletion_lag_ms = null;
 
     @Override
     public boolean equals(Object obj) {
@@ -80,7 +81,8 @@ public boolean equals(Object obj) {
                     && max_consumers_per_topic == other.max_consumers_per_topic
                     && max_consumers_per_subscription == other.max_consumers_per_subscription
                     && compaction_threshold == other.compaction_threshold
-                    && offload_threshold == other.offload_threshold;
+                    && offload_threshold == other.offload_threshold
+                    && offload_deletion_lag_ms == other.offload_deletion_lag_ms;
         }
 
         return false;
@@ -112,6 +114,7 @@ public String toString() {
                 .add("max_consumers_per_topic", max_consumers_per_topic)
                 .add("max_consumers_per_subscription", max_consumers_per_topic)
                 .add("compaction_threshold", compaction_threshold)
-                .add("offload_threshold", offload_threshold).toString();
+                .add("offload_threshold", offload_threshold)
+                .add("offload_deletion_lag_ms", offload_deletion_lag_ms).toString();
     }
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java
index b595dd3729..dd1cb7ae84 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java
@@ -18,12 +18,15 @@
  */
 package org.apache.pulsar.tests.integration.offload;
 
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 
 import com.google.common.collect.ImmutableMap;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.BookKeeperAdmin;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -32,6 +35,7 @@
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.LedgerInfo;
 
 import org.apache.pulsar.tests.integration.containers.BrokerContainer;
 import org.apache.pulsar.tests.integration.containers.S3Container;
@@ -241,4 +245,109 @@ public void testPublishOffloadAndConsumeViaThreshold(String serviceUrl,
String a
             Assert.assertEquals(admin.namespaces().getOffloadThreshold(namespace), -1L);
         }
     }
+
+    private boolean ledgerOffloaded(List<LedgerInfo> ledgers, long ledgerId) {
+        return ledgers.stream().filter(l -> l.ledgerId == ledgerId)
+            .map(l -> l.offloaded).findFirst().get();
+    }
+
+    private long writeAndWaitForOffload(String serviceUrl, String adminUrl, String topic)
throws Exception {
+        try(PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
+            Producer producer = client.newProducer().topic(topic)
+                .blockIfQueueFull(true).enableBatching(false).create();
+            PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) {
+
+            List<LedgerInfo> ledgers = admin.topics().getInternalStats(topic).ledgers;
+            long currentLedger = ledgers.get(ledgers.size() - 1).ledgerId;
+
+            client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe().close();
+
+            // write enough to topic to make it roll twice
+            for (int i = 0; i < ENTRIES_PER_LEDGER * 2.5; i++) {
+                producer.sendAsync(buildEntry("offload-message" + i));
+            }
+            producer.send(buildEntry("final-offload-message"));
+
+            // wait up to 30 seconds for offload to occur
+            for (int i = 0;
+                 i < 300 && !ledgerOffloaded(admin.topics().getInternalStats(topic).ledgers,
currentLedger);
+                 i++) {
+                Thread.sleep(100);
+            }
+            Assert.assertTrue(ledgerOffloaded(admin.topics().getInternalStats(topic).ledgers,
currentLedger));
+
+            return currentLedger;
+        }
+    }
+
+    public boolean ledgerExistsInBookKeeper(long ledgerId) throws Exception {
+        ClientConfiguration bkConf = new ClientConfiguration();
+        bkConf.setZkServers(pulsarCluster.getZKConnString());
+        try (BookKeeperAdmin bk = new BookKeeperAdmin(bkConf)) {
+            try {
+                bk.openLedger(ledgerId).close();
+                return true;
+            } catch (BKException.BKNoSuchLedgerExistsException e) {
+                return false;
+            }
+        }
+    }
+
+    @Test(dataProvider =  "ServiceAndAdminUrls")
+    public void testPublishOffloadAndConsumeDeletionLag(String serviceUrl, String adminUrl)
throws Exception {
+        final String tenant = "s3-offload-test-deletion-lag-" + randomName(4);
+        final String namespace = tenant + "/ns1";
+        final String topic = "persistent://" + namespace + "/topic1";
+
+        pulsarCluster.runAdminCommandOnAnyBroker("tenants",
+                "create", "--allowed-clusters", pulsarCluster.getClusterName(),
+                "--admin-roles", "offload-admin", tenant);
+
+        pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
+                "create", "--clusters", pulsarCluster.getClusterName(), namespace);
+
+        // set threshold to offload runs immediately after role
+        pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
+                                                 "set-offload-threshold", "--size", "0",
namespace);
+
+        String output = pulsarCluster.runAdminCommandOnAnyBroker(
+                "namespaces", "get-offload-deletion-lag", namespace).getStdout();
+        Assert.assertTrue(output.contains("Unset for namespace"));
+
+        long offloadedLedger = writeAndWaitForOffload(serviceUrl, adminUrl, topic);
+        // give it up to 5 seconds to delete, it shouldn't
+        // so we wait this every time
+        Thread.sleep(5000);
+        Assert.assertTrue(ledgerExistsInBookKeeper(offloadedLedger));
+
+        pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "set-offload-deletion-lag",
namespace,
+                                                 "--lag", "0m");
+        output = pulsarCluster.runAdminCommandOnAnyBroker(
+                "namespaces", "get-offload-deletion-lag", namespace).getStdout();
+        Assert.assertTrue(output.contains("0 minute(s)"));
+
+        offloadedLedger = writeAndWaitForOffload(serviceUrl, adminUrl, topic);
+        // wait up to 10 seconds for ledger to be deleted
+        for (int i = 0; i < 10 && ledgerExistsInBookKeeper(offloadedLedger); i++)
{
+            writeAndWaitForOffload(serviceUrl, adminUrl, topic);
+            Thread.sleep(1000);
+        }
+        Assert.assertFalse(ledgerExistsInBookKeeper(offloadedLedger));
+
+        pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "clear-offload-deletion-lag",
namespace);
+
+        Thread.sleep(5); // wait 5 seconds to allow broker to see update
+
+        output = pulsarCluster.runAdminCommandOnAnyBroker(
+                "namespaces", "get-offload-deletion-lag", namespace).getStdout();
+        Assert.assertTrue(output.contains("Unset for namespace"));
+
+        offloadedLedger = writeAndWaitForOffload(serviceUrl, adminUrl, topic);
+
+        // give it up to 5 seconds to delete, it shouldn't
+        // so we wait this every time
+        Thread.sleep(5000);
+        Assert.assertTrue(ledgerExistsInBookKeeper(offloadedLedger));
+    }
+
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message