This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 595ba81 Namespace level policy for offload deletion lag (#2256)
595ba81 is described below
commit 595ba8102ba811e301b917ce8b34b5abe36fcb62
Author: Ivan Kelly <ivank@apache.org>
AuthorDate: Wed Aug 1 10:22:10 2018 +0200
Namespace level policy for offload deletion lag (#2256)
Add a policy parameter at the namespace level for the offload deletion
lag, the amount of time to wait after offloading a ledger before we
delete the ledger from bookkeeper.
This namespace policy overrides the broker configured policy. Via the
REST api this is exposed at a millisecond granularity while via the
CLI it is exposed a minute granularity.
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 39 +++++++-
.../apache/pulsar/broker/admin/v2/Namespaces.java | 44 +++++++++
.../pulsar/broker/service/BrokerService.java | 12 ++-
.../org/apache/pulsar/client/admin/Namespaces.java | 69 +++++++++++++
.../client/admin/internal/NamespacesImpl.java | 35 +++++++
.../org/apache/pulsar/admin/cli/CmdNamespaces.java | 52 ++++++++++
.../pulsar/common/policies/data/Policies.java | 7 +-
.../tests/integration/offload/TestS3Offload.java | 109 +++++++++++++++++++++
8 files changed, 360 insertions(+), 7 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 553f53a..c4202c6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1590,7 +1590,7 @@ public abstract class NamespacesBase extends AdminResource {
globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
log.info("[{}] Successfully updated offloadThreshold configuration: namespace={},
value={}",
- clientAppId(), namespaceName, policies.compaction_threshold);
+ clientAppId(), namespaceName, policies.offload_threshold);
} catch (KeeperException.NoNodeException e) {
log.warn("[{}] Failed to update offloadThreshold configuration for namespace
{}: does not exist",
@@ -1609,5 +1609,42 @@ public abstract class NamespacesBase extends AdminResource {
}
}
+ protected Long internalGetOffloadDeletionLag() {
+ validateAdminAccessForTenant(namespaceName.getTenant());
+ return getNamespacePolicies(namespaceName).offload_deletion_lag_ms;
+ }
+
+ protected void internalSetOffloadDeletionLag(Long newDeletionLagMs) {
+ validateSuperUserAccess();
+ validatePoliciesReadOnlyAccess();
+
+ try {
+ Stat nodeStat = new Stat();
+ final String path = path(POLICIES, namespaceName.toString());
+ byte[] content = globalZk().getData(path, null, nodeStat);
+ Policies policies = jsonMapper().readValue(content, Policies.class);
+ policies.offload_deletion_lag_ms = newDeletionLagMs;
+ globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
+ policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+ log.info("[{}] Successfully updated offloadDeletionLagMs configuration: namespace={},
value={}",
+ clientAppId(), namespaceName, policies.offload_deletion_lag_ms);
+
+ } catch (KeeperException.NoNodeException e) {
+ log.warn("[{}] Failed to update offloadDeletionLagMs configuration for namespace
{}: does not exist",
+ clientAppId(), namespaceName);
+ throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
+ } catch (KeeperException.BadVersionException e) {
+ log.warn("[{}] Failed to update offloadDeletionLagMs configuration for namespace
{}: concurrent modification",
+ clientAppId(), namespaceName);
+ throw new RestException(Status.CONFLICT, "Concurrent modification");
+ } catch (RestException pfe) {
+ throw pfe;
+ } catch (Exception e) {
+ log.error("[{}] Failed to update offloadDeletionLag configuration for namespace
{}",
+ clientAppId(), namespaceName, e);
+ throw new RestException(e);
+ }
+ }
+
private static final Logger log = LoggerFactory.getLogger(NamespacesBase.class);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 0fcd4aa..07e26ae 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -715,5 +715,49 @@ public class Namespaces extends NamespacesBase {
internalSetOffloadThreshold(newThreshold);
}
+ @GET
+ @Path("/{property}/{namespace}/offloadDeletionLagMs")
+ @ApiOperation(value = "Number of milliseconds to wait before deleting a ledger segment
which has been offloaded"
+ + " from the Pulsar cluster's local storage (i.e. BookKeeper)",
+ notes = "A negative value denotes that deletion has been completely disabled."
+ + " 'null' denotes that the topics in the namespace will fall back
to the"
+ + " broker default for deletion lag.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Namespace doesn't exist")
})
+ public Long getOffloadDeletionLag(@PathParam("property") String property,
+ @PathParam("namespace") String namespace) {
+ validateNamespaceName(property, namespace);
+ return internalGetOffloadDeletionLag();
+ }
+
+ @PUT
+ @Path("/{property}/{namespace}/offloadDeletionLagMs")
+ @ApiOperation(value = "Set number of milliseconds to wait before deleting a ledger segment
which has been offloaded"
+ + " from the Pulsar cluster's local storage (i.e. BookKeeper)",
+ notes = "A negative value disables the deletion completely.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Namespace doesn't exist"),
+ @ApiResponse(code = 409, message = "Concurrent modification"),
+ @ApiResponse(code = 412, message = "offloadDeletionLagMs value
is not valid") })
+ public void setOffloadDeletionLag(@PathParam("property") String property,
+ @PathParam("namespace") String namespace,
+ long newDeletionLagMs) {
+ validateNamespaceName(property, namespace);
+ internalSetOffloadDeletionLag(newDeletionLagMs);
+ }
+
+ @DELETE
+ @Path("/{property}/{namespace}/offloadDeletionLagMs")
+ @ApiOperation(value = "Clear the namespace configured offload deletion lag. The topics
in the namespace"
+ + " will fallback to using the default configured deletion lag
for the broker")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Namespace doesn't exist"),
+ @ApiResponse(code = 409, message = "Concurrent modification")
})
+ public void clearOffloadDeletionLag(@PathParam("property") String property,
+ @PathParam("namespace") String namespace) {
+ validateNamespaceName(property, namespace);
+ internalSetOffloadDeletionLag(null);
+ }
+
private static final Logger log = LoggerFactory.getLogger(Namespaces.class);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 182f3cc..42893a7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -766,10 +766,14 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
managedLedgerConfig.setLedgerOffloader(pulsar.getManagedLedgerOffloader());
- managedLedgerConfig.setOffloadLedgerDeletionLag(serviceConfig.getManagedLedgerOffloadDeletionLagMs(),
- TimeUnit.MILLISECONDS);
-
- policies.ifPresent(p -> managedLedgerConfig.setOffloadAutoTriggerSizeThresholdBytes(p.offload_threshold));
+ policies.ifPresent(p -> {
+ long lag = serviceConfig.getManagedLedgerOffloadDeletionLagMs();
+ if (p.offload_deletion_lag_ms != null) {
+ lag = p.offload_deletion_lag_ms;
+ }
+ managedLedgerConfig.setOffloadLedgerDeletionLag(lag, TimeUnit.MILLISECONDS);
+ managedLedgerConfig.setOffloadAutoTriggerSizeThresholdBytes(p.offload_threshold);
+ });
future.complete(managedLedgerConfig);
}, (exception) -> future.completeExceptionally(exception)));
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index 5ee2f22..8a319c6 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.admin;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException;
@@ -1212,4 +1213,72 @@ public interface Namespaces {
* Unexpected error
*/
void setOffloadThreshold(String namespace, long compactionThreshold) throws PulsarAdminException;
+
+ /**
+ * Get the offload deletion lag for a namespace, in milliseconds.
+ * The number of milliseconds to wait before deleting a ledger segment which has been
offloaded from
+ * the Pulsar cluster's local storage (i.e. BookKeeper).
+ *
+ * If the offload deletion lag has not been set for the namespace, the method returns
'null'
+ * and the namespace will use the configured default of the pulsar broker.
+ *
+ * A negative value disables deletion of the local ledger completely, though it will
still be deleted
+ * if it exceeds the topics retention policy, along with the offloaded copy.
+ *
+ * <p>
+ * Response example:
+ *
+ * <pre>
+ * <code>3600000</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ * @return the offload deletion lag for the namespace in milliseconds, or null if not
set
+ *
+ * @throws NotAuthorizedException
+ * Don't have admin permission
+ * @throws NotFoundException
+ * Namespace does not exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ Long getOffloadDeleteLagMs(String namespace) throws PulsarAdminException;
+
+ /**
+ * Set the offload deletion lag for a namespace.
+ *
+ * The offload deletion lag is the amount of time to wait after offloading a ledger segment
to long term storage,
+ * before deleting its copy stored on the Pulsar cluster's local storage (i.e. BookKeeper).
+ *
+ * A negative value disables deletion of the local ledger completely, though it will
still be deleted
+ * if it exceeds the topics retention policy, along with the offloaded copy.
+ *
+ * @param namespace
+ * Namespace name
+ * @param lag the duration to wait before deleting the local copy
+ * @param unit the timeunit of the duration
+ *
+ * @throws NotAuthorizedException
+ * Don't have admin permission
+ * @throws NotFoundException
+ * Namespace does not exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void setOffloadDeleteLag(String namespace, long lag, TimeUnit unit) throws PulsarAdminException;
+
+ /**
+ * Clear the offload deletion lag for a namespace.
+ *
+ * The namespace will fall back to using the configured default of the pulsar broker.
+ *
+ * @throws NotAuthorizedException
+ * Don't have admin permission
+ * @throws NotFoundException
+ * Namespace does not exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void clearOffloadDeleteLag(String namespace) throws PulsarAdminException;
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 4ed0b8a..ba2950e 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -23,6 +23,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
@@ -701,6 +702,40 @@ public class NamespacesImpl extends BaseResource implements Namespaces
{
}
}
+ @Override
+ public Long getOffloadDeleteLagMs(String namespace) throws PulsarAdminException {
+ try {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "offloadDeletionLagMs");
+ return request(path).get(Long.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public void setOffloadDeleteLag(String namespace, long lag, TimeUnit unit) throws PulsarAdminException
{
+ try {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "offloadDeletionLagMs");
+ request(path).put(Entity.entity(TimeUnit.MILLISECONDS.convert(lag, unit), MediaType.APPLICATION_JSON),
+ ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public void clearOffloadDeleteLag(String namespace) throws PulsarAdminException {
+ try {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "offloadDeletionLagMs");
+ request(path).delete(ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
private WebTarget namespacePath(NamespaceName namespace, String... parts) {
final WebTarget base = namespace.isV2() ? adminV2Namespaces : adminNamespaces;
WebTarget namespacePath = base.path(namespace.toString());
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 4f898a5..341b249 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -29,6 +29,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.apache.pulsar.admin.cli.utils.IOUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -832,6 +833,53 @@ public class CmdNamespaces extends CmdBase {
}
}
+ @Parameters(commandDescription = "Get offloadDeletionLag, in minutes, for a namespace")
+ private class GetOffloadDeletionLag extends CliCommand {
+ @Parameter(description = "tenant/namespace\n", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ Long lag = admin.namespaces().getOffloadDeleteLagMs(namespace);
+ if (lag != null) {
+ System.out.println(TimeUnit.MINUTES.convert(lag, TimeUnit.MILLISECONDS) +
" minute(s)");
+ } else {
+ System.out.println("Unset for namespace. Defaulting to broker setting.");
+ }
+ }
+ }
+
+ @Parameters(commandDescription = "Set offloadDeletionLag for a namespace")
+ private class SetOffloadDeletionLag extends CliCommand {
+ @Parameter(description = "tenant/namespace", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "--lag", "-l" },
+ description = "Duration to wait after offloading a ledger segment, before
deleting the copy of that"
+ + " segment from cluster local storage. (eg: 10m, 5h, 3d,
2w).",
+ required = true)
+ private String lag = "-1";
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ admin.namespaces().setOffloadDeleteLag(namespace, validateTimeString(lag), TimeUnit.MINUTES);
+ }
+ }
+
+ @Parameters(commandDescription = "Clear offloadDeletionLag for a namespace")
+ private class ClearOffloadDeletionLag extends CliCommand {
+ @Parameter(description = "tenant/namespace", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ admin.namespaces().clearOffloadDeleteLag(namespace);
+ }
+ }
+
public CmdNamespaces(PulsarAdmin admin) {
super("namespaces", admin);
jcommander.addCommand("list", new GetNamespacesPerProperty());
@@ -897,5 +945,9 @@ public class CmdNamespaces extends CmdBase {
jcommander.addCommand("get-offload-threshold", new GetOffloadThreshold());
jcommander.addCommand("set-offload-threshold", new SetOffloadThreshold());
+ jcommander.addCommand("get-offload-deletion-lag", new GetOffloadDeletionLag());
+ jcommander.addCommand("set-offload-deletion-lag", new SetOffloadDeletionLag());
+ jcommander.addCommand("clear-offload-deletion-lag", new ClearOffloadDeletionLag());
+
}
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index 35a752b..9c7efeb 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -59,6 +59,7 @@ public class Policies {
public long compaction_threshold = 0;
public long offload_threshold = -1;
+ public Long offload_deletion_lag_ms = null;
@Override
public boolean equals(Object obj) {
@@ -80,7 +81,8 @@ public class Policies {
&& max_consumers_per_topic == other.max_consumers_per_topic
&& max_consumers_per_subscription == other.max_consumers_per_subscription
&& compaction_threshold == other.compaction_threshold
- && offload_threshold == other.offload_threshold;
+ && offload_threshold == other.offload_threshold
+ && offload_deletion_lag_ms == other.offload_deletion_lag_ms;
}
return false;
@@ -112,6 +114,7 @@ public class Policies {
.add("max_consumers_per_topic", max_consumers_per_topic)
.add("max_consumers_per_subscription", max_consumers_per_topic)
.add("compaction_threshold", compaction_threshold)
- .add("offload_threshold", offload_threshold).toString();
+ .add("offload_threshold", offload_threshold)
+ .add("offload_deletion_lag_ms", offload_deletion_lag_ms).toString();
}
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java
index b595dd3..dd1cb7a 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java
@@ -18,12 +18,15 @@
*/
package org.apache.pulsar.tests.integration.offload;
+import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import com.google.common.collect.ImmutableMap;
import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -32,6 +35,7 @@ import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.LedgerInfo;
import org.apache.pulsar.tests.integration.containers.BrokerContainer;
import org.apache.pulsar.tests.integration.containers.S3Container;
@@ -241,4 +245,109 @@ public class TestS3Offload extends PulsarClusterTestBase {
Assert.assertEquals(admin.namespaces().getOffloadThreshold(namespace), -1L);
}
}
+
+ private boolean ledgerOffloaded(List<LedgerInfo> ledgers, long ledgerId) {
+ return ledgers.stream().filter(l -> l.ledgerId == ledgerId)
+ .map(l -> l.offloaded).findFirst().get();
+ }
+
+ private long writeAndWaitForOffload(String serviceUrl, String adminUrl, String topic)
throws Exception {
+ try(PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
+ Producer producer = client.newProducer().topic(topic)
+ .blockIfQueueFull(true).enableBatching(false).create();
+ PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) {
+
+ List<LedgerInfo> ledgers = admin.topics().getInternalStats(topic).ledgers;
+ long currentLedger = ledgers.get(ledgers.size() - 1).ledgerId;
+
+ client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe().close();
+
+ // write enough to topic to make it roll twice
+ for (int i = 0; i < ENTRIES_PER_LEDGER * 2.5; i++) {
+ producer.sendAsync(buildEntry("offload-message" + i));
+ }
+ producer.send(buildEntry("final-offload-message"));
+
+ // wait up to 30 seconds for offload to occur
+ for (int i = 0;
+ i < 300 && !ledgerOffloaded(admin.topics().getInternalStats(topic).ledgers,
currentLedger);
+ i++) {
+ Thread.sleep(100);
+ }
+ Assert.assertTrue(ledgerOffloaded(admin.topics().getInternalStats(topic).ledgers,
currentLedger));
+
+ return currentLedger;
+ }
+ }
+
+ public boolean ledgerExistsInBookKeeper(long ledgerId) throws Exception {
+ ClientConfiguration bkConf = new ClientConfiguration();
+ bkConf.setZkServers(pulsarCluster.getZKConnString());
+ try (BookKeeperAdmin bk = new BookKeeperAdmin(bkConf)) {
+ try {
+ bk.openLedger(ledgerId).close();
+ return true;
+ } catch (BKException.BKNoSuchLedgerExistsException e) {
+ return false;
+ }
+ }
+ }
+
+ @Test(dataProvider = "ServiceAndAdminUrls")
+ public void testPublishOffloadAndConsumeDeletionLag(String serviceUrl, String adminUrl)
throws Exception {
+ final String tenant = "s3-offload-test-deletion-lag-" + randomName(4);
+ final String namespace = tenant + "/ns1";
+ final String topic = "persistent://" + namespace + "/topic1";
+
+ pulsarCluster.runAdminCommandOnAnyBroker("tenants",
+ "create", "--allowed-clusters", pulsarCluster.getClusterName(),
+ "--admin-roles", "offload-admin", tenant);
+
+ pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
+ "create", "--clusters", pulsarCluster.getClusterName(), namespace);
+
+ // set threshold to offload runs immediately after role
+ pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
+ "set-offload-threshold", "--size", "0",
namespace);
+
+ String output = pulsarCluster.runAdminCommandOnAnyBroker(
+ "namespaces", "get-offload-deletion-lag", namespace).getStdout();
+ Assert.assertTrue(output.contains("Unset for namespace"));
+
+ long offloadedLedger = writeAndWaitForOffload(serviceUrl, adminUrl, topic);
+ // give it up to 5 seconds to delete, it shouldn't
+ // so we wait this every time
+ Thread.sleep(5000);
+ Assert.assertTrue(ledgerExistsInBookKeeper(offloadedLedger));
+
+ pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "set-offload-deletion-lag",
namespace,
+ "--lag", "0m");
+ output = pulsarCluster.runAdminCommandOnAnyBroker(
+ "namespaces", "get-offload-deletion-lag", namespace).getStdout();
+ Assert.assertTrue(output.contains("0 minute(s)"));
+
+ offloadedLedger = writeAndWaitForOffload(serviceUrl, adminUrl, topic);
+ // wait up to 10 seconds for ledger to be deleted
+ for (int i = 0; i < 10 && ledgerExistsInBookKeeper(offloadedLedger); i++)
{
+ writeAndWaitForOffload(serviceUrl, adminUrl, topic);
+ Thread.sleep(1000);
+ }
+ Assert.assertFalse(ledgerExistsInBookKeeper(offloadedLedger));
+
+ pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "clear-offload-deletion-lag",
namespace);
+
+ Thread.sleep(5); // wait 5 seconds to allow broker to see update
+
+ output = pulsarCluster.runAdminCommandOnAnyBroker(
+ "namespaces", "get-offload-deletion-lag", namespace).getStdout();
+ Assert.assertTrue(output.contains("Unset for namespace"));
+
+ offloadedLedger = writeAndWaitForOffload(serviceUrl, adminUrl, topic);
+
+ // give it up to 5 seconds to delete, it shouldn't
+ // so we wait this every time
+ Thread.sleep(5000);
+ Assert.assertTrue(ledgerExistsInBookKeeper(offloadedLedger));
+ }
+
}
|