pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [incubator-pulsar] branch branch-2.1 updated: Make sure schema is initialized before the topic is loaded (#2203)
Date Mon, 27 Aug 2018 08:44:15 GMT
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 7e66b3e  Make sure schema is initialized before the topic is loaded (#2203)
7e66b3e is described below

commit 7e66b3e17da65c3a00d965f32fe7e795240ead86
Author: Matteo Merli <mmerli@apache.org>
AuthorDate: Tue Jul 24 22:03:43 2018 -0700

    Make sure schema is initialized before the topic is loaded (#2203)
    
    * Make sure schema is initialized before the topic is loaded
    
    * Added overloaded method
---
 .../org/apache/pulsar/broker/PulsarService.java    |  2 +-
 .../broker/admin/impl/PersistentTopicsBase.java    |  4 +--
 .../pulsar/broker/service/BrokerService.java       | 41 +++++++++++++++++-----
 .../apache/pulsar/broker/service/ServerCnx.java    |  4 +--
 .../service/nonpersistent/NonPersistentTopic.java  |  8 +++--
 .../broker/service/persistent/PersistentTopic.java |  4 +++
 .../broker/service/BrokerBkEnsemblesTests.java     |  6 ++--
 .../pulsar/broker/service/BrokerServiceTest.java   | 11 +++---
 8 files changed, 56 insertions(+), 24 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 2a341af..bbc3cfd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -563,7 +563,7 @@ public class PulsarService implements AutoCloseable {
                 try {
                     TopicName topicName = TopicName.get(topic);
                     if (bundle.includes(topicName)) {
-                        CompletableFuture<Topic> future = brokerService.getOrCreateTopic(topic);
+                        CompletableFuture<Topic> future = brokerService.getOrCreateTopic(topic,
null);
                         if (future != null) {
                             persistentTopics.add(future);
                         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 946980a..cf8355f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -486,7 +486,7 @@ public class PersistentTopicsBase extends AdminResource {
             internalDeleteTopic(authoritative);
         }
     }
-    
+
     protected void internalDeleteTopic(boolean authoritative) {
         validateAdminOperationOnTopic(authoritative);
         Topic topic = getTopicReference(topicName);
@@ -1191,7 +1191,7 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     private Topic getOrCreateTopic(TopicName topicName) {
-        return pulsar().getBrokerService().getOrCreateTopic(topicName.toString()).join();
+        return pulsar().getBrokerService().getOrCreateTopic(topicName.toString(), null).join();
     }
 
     /**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 741afbf..2f08137 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -112,6 +112,7 @@ import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.schema.SchemaData;
 import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.common.util.FieldParser;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -450,14 +451,19 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
     }
 
     public CompletableFuture<Optional<Topic>> getTopicIfExists(final String topic)
{
-        return getTopic(topic, false /* createIfMissing */ );
+        return getTopic(topic, false /* createIfMissing */, null /* schemaData */ );
     }
 
     public CompletableFuture<Topic> getOrCreateTopic(final String topic) {
-        return getTopic(topic, true /* createIfMissing */ ).thenApply(Optional::get);
+        return getOrCreateTopic(topic, null);
     }
 
-    private CompletableFuture<Optional<Topic>> getTopic(final String topic, boolean
createIfMissing) {
+    public CompletableFuture<Topic> getOrCreateTopic(final String topic, SchemaData
schemaData) {
+        return getTopic(topic, true /* createIfMissing */, schemaData ).thenApply(Optional::get);
+    }
+
+    private CompletableFuture<Optional<Topic>> getTopic(final String topic, boolean
createIfMissing,
+            SchemaData schemaData) {
         try {
             CompletableFuture<Optional<Topic>> topicFuture = topics.get(topic);
             if (topicFuture != null) {
@@ -471,8 +477,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
             }
             final boolean isPersistentTopic = TopicName.get(topic).getDomain().equals(TopicDomain.persistent);
             return topics.computeIfAbsent(topic, (topicName) -> {
-                return isPersistentTopic ? this.loadOrCreatePersistentTopic(topicName, createIfMissing)
-                        : createNonPersistentTopic(topicName);
+                return isPersistentTopic ? this.loadOrCreatePersistentTopic(topicName, createIfMissing,
schemaData)
+                        : createNonPersistentTopic(topicName, schemaData);
             });
         } catch (IllegalArgumentException e) {
             log.warn("[{}] Illegalargument exception when loading topic", topic, e);
@@ -489,7 +495,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
         }
     }
 
-    private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String
topic) {
+    private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String
topic, SchemaData schemaData) {
         CompletableFuture<Optional<Topic>> topicFuture = new CompletableFuture<>();
 
         if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) {
@@ -520,7 +526,15 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
             return null;
         });
 
-        return topicFuture;
+        return topicFuture.thenCompose(ot -> {
+            if (ot.isPresent()) {
+                // If a schema is provided, add or validate it before the
+                // topic is "visible"
+                return ot.get().addSchema(schemaData).thenApply(schemaVersion -> ot);
+            } else {
+                return CompletableFuture.completedFuture(ot);
+            }
+        });
     }
 
     private static <T> CompletableFuture<T> failedFuture(Throwable t) {
@@ -577,7 +591,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
      * @return CompletableFuture<Topic>
      * @throws RuntimeException
      */
-    protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final
String topic, boolean createIfMissing) throws RuntimeException {
+    protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final
String topic,
+            boolean createIfMissing, SchemaData schemaData) throws RuntimeException {
         checkTopicNsOwnership(topic);
 
         final CompletableFuture<Optional<Topic>> topicFuture = new CompletableFuture<>();
@@ -605,7 +620,15 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
                 log.debug("topic-loading for {} added into pending queue", topic);
             }
         }
-        return topicFuture;
+        return topicFuture.thenCompose(ot -> {
+            if (ot.isPresent()) {
+                // If a schema is provided, add or validate it before the
+                // topic is "visible"
+                return ot.get().addSchema(schemaData).thenApply(schemaVersion -> ot);
+            } else {
+                return CompletableFuture.completedFuture(ot);
+            }
+        });
     }
 
     private void createPersistentTopic(final String topic, boolean createIfMissing, CompletableFuture<Optional<Topic>>
topicFuture) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 4b45f72..de48f33 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -578,7 +578,7 @@ public class ServerCnx extends PulsarHandler {
                             }
                         }
 
-                        service.getOrCreateTopic(topicName.toString())
+                        service.getOrCreateTopic(topicName.toString(), schema)
                                 .thenCompose(topic -> {
                                     if (schema != null) {
                                         return topic.isSchemaCompatible(schema).thenCompose(isCompatible
-> {
@@ -785,7 +785,7 @@ public class ServerCnx extends PulsarHandler {
 
                         log.info("[{}][{}] Creating producer. producerId={}", remoteAddress,
topicName, producerId);
 
-                        service.getOrCreateTopic(topicName.toString()).thenAccept((Topic
topic) -> {
+                        service.getOrCreateTopic(topicName.toString(), schema).thenAccept((Topic
topic) -> {
                             // Before creating producer, check if backlog quota exceeded
                             // on topic
                             if (topic.isBacklogQuotaExceeded(producerName)) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index de55df3..e13549e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -415,14 +415,14 @@ public class NonPersistentTopic implements Topic {
 
     /**
      * Forcefully close all producers/consumers/replicators and deletes the topic.
-     * 
+     *
      * @return
      */
     @Override
     public CompletableFuture<Void> deleteForcefully() {
         return delete(false, true);
     }
-    
+
     private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean
closeIfClientsConnected) {
         CompletableFuture<Void> deleteFuture = new CompletableFuture<>();
 
@@ -1016,6 +1016,10 @@ public class NonPersistentTopic implements Topic {
 
     @Override
     public CompletableFuture<SchemaVersion> addSchema(SchemaData schema) {
+        if (schema == null) {
+            return CompletableFuture.completedFuture(SchemaVersion.Empty);
+        }
+
         String base = TopicName.get(getName()).getPartitionedTopicName();
         String id = TopicName.get(base).getSchemaName();
         return brokerService.pulsar()
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index c66e10b..4d289c4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1792,6 +1792,10 @@ public class PersistentTopic implements Topic, AddEntryCallback {
 
     @Override
     public CompletableFuture<SchemaVersion> addSchema(SchemaData schema) {
+        if (schema == null) {
+            return CompletableFuture.completedFuture(SchemaVersion.Empty);
+        }
+
         String base = TopicName.get(getName()).getPartitionedTopicName();
         String id = TopicName.get(base).getSchemaName();
         return brokerService.pulsar()
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
index 4ecdae4..ce023c3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
@@ -171,7 +171,7 @@ public class BrokerBkEnsemblesTests {
             consumer.acknowledge(msg);
         }
 
-        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topic1).get();
+        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topic1,
null).get();
         ManagedCursorImpl cursor = (ManagedCursorImpl) topic.getManagedLedger().getCursors().iterator().next();
         retryStrategically((test) -> cursor.getState().equals("Open"), 5, 100);
 
@@ -206,7 +206,7 @@ public class BrokerBkEnsemblesTests {
         }
 
         // (5) Broker should create new cursor-ledger and remove old cursor-ledger
-        topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topic1).get();
+        topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topic1, null).get();
         final ManagedCursorImpl cursor1 = (ManagedCursorImpl) topic.getManagedLedger().getCursors().iterator().next();
         retryStrategically((test) -> cursor1.getState().equals("Open"), 5, 100);
         long newCursorLedgerId = cursor1.getCursorLedger();
@@ -254,7 +254,7 @@ public class BrokerBkEnsemblesTests {
         Consumer<byte[]> consumer = client.newConsumer().topic(topic1).subscriptionName("my-subscriber-name")
                 .receiverQueueSize(5).subscribe();
 
-        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topic1).get();
+        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topic1,
null).get();
         ManagedLedgerImpl ml = (ManagedLedgerImpl) topic.getManagedLedger();
         ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().iterator().next();
         Field configField = ManagedCursorImpl.class.getDeclaredField("config");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 9fa40d4..b9b1b84 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -105,7 +105,7 @@ public class BrokerServiceTest extends BrokerTestBase {
         BrokerService service = pulsar.getBrokerService();
 
         final CountDownLatch latch1 = new CountDownLatch(1);
-        service.getOrCreateTopic(topic).thenAccept(t -> {
+        service.getOrCreateTopic(topic, null).thenAccept(t -> {
             latch1.countDown();
             fail("should fail as NS is not owned");
         }).exceptionally(exception -> {
@@ -118,7 +118,7 @@ public class BrokerServiceTest extends BrokerTestBase {
         admin.lookups().lookupTopic(topic);
 
         final CountDownLatch latch2 = new CountDownLatch(1);
-        service.getOrCreateTopic(topic).thenAccept(t -> {
+        service.getOrCreateTopic(topic, null).thenAccept(t -> {
             try {
                 assertNotNull(service.getTopicReference(topic));
             } catch (Exception e) {
@@ -746,7 +746,8 @@ public class BrokerServiceTest extends BrokerTestBase {
         pulsar.getNamespaceService().getOwnershipCache().updateBundleState(bundle, false);
 
         // try to create topic which should fail as bundle is disable
-        CompletableFuture<Optional<Topic>> futureResult = pulsar.getBrokerService().loadOrCreatePersistentTopic(topicName,
true);
+        CompletableFuture<Optional<Topic>> futureResult = pulsar.getBrokerService()
+                .loadOrCreatePersistentTopic(topicName, true, null);
 
         try {
             futureResult.get();
@@ -789,7 +790,7 @@ public class BrokerServiceTest extends BrokerTestBase {
 
         // create topic async and wait on the future completion
         executor.submit(() -> {
-            service.getOrCreateTopic(deadLockTestTopic).thenAccept(topic -> topicCreation.complete(null)).exceptionally(e
-> {
+            service.getOrCreateTopic(deadLockTestTopic, null).thenAccept(topic -> topicCreation.complete(null)).exceptionally(e
-> {
                 topicCreation.completeExceptionally(e.getCause());
                 return null;
             });
@@ -841,7 +842,7 @@ public class BrokerServiceTest extends BrokerTestBase {
 
         // create topic async and wait on the future completion
         executor.submit(() -> {
-            service.getOrCreateTopic(deadLockTestTopic).thenAccept(topic -> topicCreation.complete(null)).exceptionally(e
-> {
+            service.getOrCreateTopic(deadLockTestTopic, null).thenAccept(topic -> topicCreation.complete(null)).exceptionally(e
-> {
                 topicCreation.completeExceptionally(e.getCause());
                 return null;
             });


Mime
View raw message