pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] sijie closed pull request #3036: Use topic unload instead of broker restarts in TestS3Offload
Date Sun, 25 Nov 2018 02:49:00 GMT
sijie closed pull request #3036: Use topic unload instead of broker restarts in TestS3Offload
URL: https://github.com/apache/pulsar/pull/3036
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java
index 1bae448723..6a58f93143 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java
@@ -92,9 +92,8 @@ public void testPublishOffloadAndConsumeViaCLI(String serviceUrl, String
adminUr
 
         long firstLedger = -1;
         try(PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
-            Producer producer = client.newProducer().topic(topic)
-                .blockIfQueueFull(true).enableBatching(false).create();
-            PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) {
+            Producer<byte[]> producer = client.newProducer().topic(topic)
+                .blockIfQueueFull(true).enableBatching(false).create();) {
             client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe().close();
 
             // write enough to topic to make it roll
@@ -102,8 +101,10 @@ public void testPublishOffloadAndConsumeViaCLI(String serviceUrl, String
adminUr
             for (; i < ENTRIES_PER_LEDGER * 1.5; i++) {
                 producer.sendAsync(buildEntry("offload-message" + i));
             }
-            MessageId latestMessage = producer.send(buildEntry("offload-message" + i));
+            producer.flush();
+        }
 
+        try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build())
{
             // read managed ledger info, check ledgers exist
             firstLedger = admin.topics().getInternalStats(topic).ledgers.get(0).ledgerId;
 
@@ -125,27 +126,24 @@ public void testPublishOffloadAndConsumeViaCLI(String serviceUrl, String
adminUr
             output = pulsarCluster.runAdminCommandOnAnyBroker("topics",
                 "offload-status", "-w", topic).getStdout();
             Assert.assertTrue(output.contains("Offload was a success"));
-        }
 
-        // stop brokers to clear all caches, open handles, etc
-        pulsarCluster.stopAllBrokers();
+            // delete the first ledger, so that we cannot possibly read from it
+            ClientConfiguration bkConf = new ClientConfiguration();
+            bkConf.setZkServers(pulsarCluster.getZKConnString());
+            try (BookKeeper bk = new BookKeeper(bkConf)) {
+                bk.deleteLedger(firstLedger);
+            }
 
-        // delete the first ledger, so that we cannot possibly read from it
-        ClientConfiguration bkConf = new ClientConfiguration();
-        bkConf.setZkServers(pulsarCluster.getZKConnString());
-        try (BookKeeper bk = new BookKeeper(bkConf)) {
-            bk.deleteLedger(firstLedger);
+            // Unload topic to clear all caches, open handles, etc
+            admin.topics().unload(topic);
         }
 
-        // start all brokers again
-        pulsarCluster.startAllBrokers();
-
         log.info("Read back the data (which would be in that first ledger)");
         try(PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
-            Consumer consumer = client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe())
{
+            Consumer<byte[]> consumer = client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe())
{
             // read back from topic
             for (int i = 0; i < ENTRIES_PER_LEDGER * 1.5; i++) {
-                Message m = consumer.receive(1, TimeUnit.MINUTES);
+                Message<byte[]> m = consumer.receive(1, TimeUnit.MINUTES);
                 Assert.assertEquals(buildEntry("offload-message" + i), m.getData());
             }
         }
@@ -169,9 +167,9 @@ public void testPublishOffloadAndConsumeViaThreshold(String serviceUrl,
String a
 
         long firstLedger = 0;
         try(PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
-            Producer producer = client.newProducer().topic(topic)
+            Producer<byte[]> producer = client.newProducer().topic(topic)
                 .blockIfQueueFull(true).enableBatching(false).create();
-            PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) {
+            ) {
 
             client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe().close();
 
@@ -179,8 +177,11 @@ public void testPublishOffloadAndConsumeViaThreshold(String serviceUrl,
String a
             for (int i = 0; i < ENTRIES_PER_LEDGER * 2.5; i++) {
                 producer.sendAsync(buildEntry("offload-message" + i));
             }
-            producer.send(buildEntry("final-offload-message"));
 
+            producer.flush();
+        }
+
+        try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build())
{
             firstLedger = admin.topics().getInternalStats(topic).ledgers.get(0).ledgerId;
 
             // wait up to 30 seconds for offload to occur
@@ -188,27 +189,24 @@ public void testPublishOffloadAndConsumeViaThreshold(String serviceUrl,
String a
                 Thread.sleep(100);
             }
             Assert.assertTrue(admin.topics().getInternalStats(topic).ledgers.get(0).offloaded);
-        }
 
-        // stop brokers to clear all caches, open handles, etc
-        pulsarCluster.stopAllBrokers();
+            // delete the first ledger, so that we cannot possibly read from it
+            ClientConfiguration bkConf = new ClientConfiguration();
+            bkConf.setZkServers(pulsarCluster.getZKConnString());
+            try (BookKeeper bk = new BookKeeper(bkConf)) {
+                bk.deleteLedger(firstLedger);
+            }
 
-        // delete the first ledger, so that we cannot possibly read from it
-        ClientConfiguration bkConf = new ClientConfiguration();
-        bkConf.setZkServers(pulsarCluster.getZKConnString());
-        try (BookKeeper bk = new BookKeeper(bkConf)) {
-            bk.deleteLedger(firstLedger);
+            // Unload topic to clear all caches, open handles, etc
+            admin.topics().unload(topic);
         }
 
-        // start all brokers again
-        pulsarCluster.startAllBrokers();
-
         log.info("Read back the data (which would be in that first ledger)");
         try (PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
-             Consumer consumer = client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe())
{
+             Consumer<byte[]> consumer = client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe())
{
             // read back from topic
             for (int i = 0; i < ENTRIES_PER_LEDGER * 2.5; i++) {
-                Message m = consumer.receive(1, TimeUnit.MINUTES);
+                Message<byte[]> m = consumer.receive(1, TimeUnit.MINUTES);
                 Assert.assertEquals(buildEntry("offload-message" + i), m.getData());
             }
         }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message