pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mme...@apache.org
Subject [pulsar] branch master updated: Fixed race condition in schema initialization in partitioned topics (#2959)
Date Thu, 08 Nov 2018 18:09:50 GMT
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b708b49  Fixed race condition in schema initialization in partitioned topics (#2959)
b708b49 is described below

commit b708b49638833a663c1e8ea43ba8e91af56eb93a
Author: Matteo Merli <mmerli@apache.org>
AuthorDate: Thu Nov 8 10:09:44 2018 -0800

    Fixed race condition in schema initialization in partitioned topics (#2959)
    
    * Fixed race condition in schema initialization in partitioned topics
    
    * Removed lombok log
    
    * Fixed tests
---
 .../service/schema/BookkeeperSchemaStorage.java    | 244 ++++++++++++++-------
 .../schema/PartitionedTopicsSchemaTest.java        |   4 +-
 .../client/impl/PartitionedProducerImpl.java       |   4 +-
 3 files changed, 173 insertions(+), 79 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index b8d9a27..f0e9699 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -53,11 +53,16 @@ import org.apache.pulsar.zookeeper.ZooKeeperCache;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class BookkeeperSchemaStorage implements SchemaStorage {
+    private static final Logger log = LoggerFactory.getLogger(BookkeeperSchemaStorage.class);
+
     private static final String SchemaPath = "/schemas";
     private static final List<ACL> Acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
     private static final byte[] LedgerPassword = "".getBytes();
@@ -68,9 +73,6 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
     private final ServiceConfiguration config;
     private BookKeeper bookKeeper;
 
-
-    private final ConcurrentMap<String, CompletableFuture<LocatorEntry>> locatorEntries
= new ConcurrentHashMap<>();
-
     private final ConcurrentMap<String, CompletableFuture<StoredSchema>> readSchemaOperations
= new ConcurrentHashMap<>();
 
     @VisibleForTesting
@@ -124,9 +126,15 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
     private CompletableFuture<StoredSchema> getSchema(String schemaId) {
         // There's already a schema read operation in progress. Just piggyback on that
         return readSchemaOperations.computeIfAbsent(schemaId, key -> {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Fetching schema from store", schemaId);
+            }
             CompletableFuture<StoredSchema> future = new CompletableFuture<>();
 
             getSchemaLocator(getSchemaPath(schemaId)).thenCompose(locator -> {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Got schema locator {}", schemaId, locator);
+                }
                 if (!locator.isPresent()) {
                     return completedFuture(null);
                 }
@@ -136,6 +144,10 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
                         .thenApply(entry -> new StoredSchema(entry.getSchemaData().toByteArray(),
                                 new LongSchemaVersion(schemaLocator.getInfo().getVersion())));
             }).handleAsync((res, ex) -> {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Get operation completed. res={} -- ex={}", schemaId,
res, ex);
+                }
+
                 // Cleanup the pending ops from the map
                 readSchemaOperations.remove(schemaId, future);
                 if (ex != null) {
@@ -165,7 +177,14 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
 
     @NotNull
     private CompletableFuture<StoredSchema> getSchema(String schemaId, long version)
{
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Get schema - version: {}", schemaId, version);
+        }
+
         return getSchemaLocator(getSchemaPath(schemaId)).thenCompose(locator -> {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Get schema - version: {} - locator: {}", schemaId, version,
locator);
+            }
 
             if (!locator.isPresent()) {
                 return completedFuture(null);
@@ -188,29 +207,116 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
 
     @NotNull
     private CompletableFuture<Long> putSchema(String schemaId, byte[] data, byte[]
hash) {
-        return getOrCreateSchemaLocator(getSchemaPath(schemaId)).thenCompose(locatorEntry
->
-            addNewSchemaEntryToStore(locatorEntry.locator.getIndexList(), data).thenCompose(position
->
-                updateSchemaLocator(schemaId, locatorEntry, position, hash)
-            )
-        );
+        return getSchemaLocator(getSchemaPath(schemaId)).thenCompose(optLocatorEntry ->
{
+            if (optLocatorEntry.isPresent()) {
+                // Schema locator was already present
+                return addNewSchemaEntryToStore(optLocatorEntry.get().locator.getIndexList(),
data)
+                        .thenCompose(position -> updateSchemaLocator(schemaId, optLocatorEntry.get(),
position, hash));
+            } else {
+                // No schema was defined yet
+                CompletableFuture<Long> future = new CompletableFuture<>();
+                createNewSchema(schemaId, data, hash)
+                        .thenAccept(version -> future.complete(version))
+                        .exceptionally(ex -> {
+                            if (ex.getCause() instanceof NodeExistsException) {
+                                // There was a race condition on the schema creation. Since
it has now been created,
+                                // retry the whole operation so that we have a chance to
recover without bubbling error
+                                // back to producer/consumer
+                                putSchema(schemaId, data, hash)
+                                        .thenAccept(version -> future.complete(version))
+                                        .exceptionally(ex2 -> {
+                                            future.completeExceptionally(ex2);
+                                            return null;
+                                        });
+                            } else {
+                                // For other errors, just fail the operation
+                                future.completeExceptionally(ex);
+                            }
+
+                            return null;
+                        });
+
+                return future;
+            }
+        });
     }
 
     @NotNull
     private CompletableFuture<Long> putSchemaIfAbsent(String schemaId, byte[] data,
byte[] hash) {
-        return getOrCreateSchemaLocator(getSchemaPath(schemaId)).thenCompose(locatorEntry
-> {
-            byte[] storedHash = locatorEntry.locator.getInfo().getHash().toByteArray();
-            if (storedHash.length > 0 && Arrays.equals(storedHash, hash)) {
-                return completedFuture(locatorEntry.locator.getInfo().getVersion());
-            }
-            return findSchemaEntryByHash(locatorEntry.locator.getIndexList(), hash).thenCompose(version
-> {
-                if (isNull(version)) {
-                    return addNewSchemaEntryToStore(locatorEntry.locator.getIndexList(),
data).thenCompose(position ->
-                        updateSchemaLocator(schemaId, locatorEntry, position, hash)
-                    );
-                } else {
-                    return completedFuture(version);
+        return getSchemaLocator(getSchemaPath(schemaId)).thenCompose(optLocatorEntry ->
{
+
+            if (optLocatorEntry.isPresent()) {
+                // Schema locator was already present
+                SchemaStorageFormat.SchemaLocator locator = optLocatorEntry.get().locator;
+                byte[] storedHash = locator.getInfo().getHash().toByteArray();
+                if (storedHash.length > 0 && Arrays.equals(storedHash, hash))
{
+                    return completedFuture(locator.getInfo().getVersion());
                 }
-            });
+
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] findSchemaEntryByHash - hash={}", schemaId, hash);
+                }
+
+                return findSchemaEntryByHash(locator.getIndexList(), hash).thenCompose(version
-> {
+                    if (isNull(version)) {
+                        return addNewSchemaEntryToStore(locator.getIndexList(), data).thenCompose(
+                                position -> updateSchemaLocator(schemaId, optLocatorEntry.get(),
position, hash));
+                    } else {
+                        return completedFuture(version);
+                    }
+                });
+            } else {
+                // No schema was defined yet
+                CompletableFuture<Long> future = new CompletableFuture<>();
+                createNewSchema(schemaId, data, hash)
+                        .thenAccept(version -> future.complete(version))
+                        .exceptionally(ex -> {
+                            if (ex.getCause() instanceof NodeExistsException) {
+                                // There was a race condition on the schema creation. Since
it has now been created,
+                                // retry the whole operation so that we have a chance to
recover without bubbling error
+                                // back to producer/consumer
+                                putSchemaIfAbsent(schemaId, data, hash)
+                                        .thenAccept(version -> future.complete(version))
+                                        .exceptionally(ex2 -> {
+                                            future.completeExceptionally(ex2);
+                                            return null;
+                                        });
+                            } else {
+                                // For other errors, just fail the operation
+                                future.completeExceptionally(ex);
+                            }
+
+                            return null;
+                        });
+
+                return future;
+            }
+        });
+    }
+
+    private CompletableFuture<Long> createNewSchema(String schemaId, byte[] data, byte[]
hash) {
+        SchemaStorageFormat.IndexEntry emptyIndex = SchemaStorageFormat.IndexEntry.newBuilder()
+                        .setVersion(0)
+                        .setHash(copyFrom(hash))
+                        .setPosition(SchemaStorageFormat.PositionInfo.newBuilder()
+                                .setEntryId(-1L)
+                                .setLedgerId(-1L)
+                        ).build();
+
+        return addNewSchemaEntryToStore(Collections.singletonList(emptyIndex), data).thenCompose(position
-> {
+            // The schema was stored in the ledger, now update the z-node with the pointer
to it
+            SchemaStorageFormat.IndexEntry info = SchemaStorageFormat.IndexEntry.newBuilder()
+                    .setVersion(0)
+                    .setPosition(position)
+                    .setHash(copyFrom(hash))
+                    .build();
+
+            return createSchemaLocator(getSchemaPath(schemaId), SchemaStorageFormat.SchemaLocator.newBuilder()
+                    .setInfo(info)
+                    .addAllIndex(
+                            newArrayList(info))
+                    .build())
+                            .thenApply(ignore -> 0L);
         });
     }
 
@@ -226,7 +332,7 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
     }
 
     @NotNull
-    private String getSchemaPath(String schemaId) {
+    private static String getSchemaPath(String schemaId) {
         return SchemaPath + "/" + schemaId;
     }
 
@@ -310,8 +416,12 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
             }
         }
 
-        return readSchemaEntry(index.get(0).getPosition())
-            .thenCompose(entry -> findSchemaEntryByHash(entry.getIndexList(), hash));
+        if (index.get(0).getPosition().getLedgerId() == -1) {
+            return completedFuture(null);
+        } else {
+            return readSchemaEntry(index.get(0).getPosition())
+                    .thenCompose(entry -> findSchemaEntryByHash(entry.getIndexList(),
hash));
+        }
 
     }
 
@@ -319,6 +429,10 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
     private CompletableFuture<SchemaStorageFormat.SchemaEntry> readSchemaEntry(
         SchemaStorageFormat.PositionInfo position
     ) {
+        if (log.isDebugEnabled()) {
+            log.debug("Reading schema entry from {}", position);
+        }
+
         return openLedger(position.getLedgerId())
             .thenCompose((ledger) ->
                 Functions.getLedgerEntry(ledger, position.getEntryId())
@@ -343,6 +457,24 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
     }
 
     @NotNull
+    private CompletableFuture<LocatorEntry> createSchemaLocator(String id, SchemaStorageFormat.SchemaLocator
locator) {
+        CompletableFuture<LocatorEntry> future = new CompletableFuture<>();
+
+        ZkUtils.asyncCreateFullPathOptimistic(zooKeeper, id, locator.toByteArray(), Acl,
+                CreateMode.PERSISTENT, (rc, path, ctx, name) -> {
+                    Code code = Code.get(rc);
+                    if (code != Code.OK) {
+                        future.completeExceptionally(KeeperException.create(code));
+                    } else {
+                        // Newly created z-node will have version 0
+                        future.complete(new LocatorEntry(locator, 0));
+                    }
+                }, null);
+
+        return future;
+    }
+
+    @NotNull
     private CompletableFuture<Optional<LocatorEntry>> getSchemaLocator(String
schema) {
         return localZkCache.getEntryAsync(schema, new SchemaLocatorDeserializer()).thenApply(optional
->
             optional.map(entry -> new LocatorEntry(entry.getKey(), entry.getValue().getVersion()))
@@ -350,57 +482,12 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
     }
 
     @NotNull
-    private CompletableFuture<LocatorEntry> getOrCreateSchemaLocator(String schema)
{
-        // Protect from concurrent schema locator creation
-        return locatorEntries.computeIfAbsent(schema, key -> {
-            CompletableFuture<LocatorEntry> future = new CompletableFuture<>();
-
-            getSchemaLocator(schema).thenCompose(schemaLocatorStatEntry -> {
-                if (schemaLocatorStatEntry.isPresent()) {
-                    return completedFuture(schemaLocatorStatEntry.get());
-                } else {
-                    SchemaStorageFormat.SchemaLocator locator = SchemaStorageFormat.SchemaLocator.newBuilder()
-                            .setInfo(SchemaStorageFormat.IndexEntry.newBuilder().setVersion(-1L)
-                                    .setHash(ByteString.EMPTY).setPosition(SchemaStorageFormat.PositionInfo.newBuilder()
-                                            .setEntryId(-1L).setLedgerId(-1L)))
-                            .build();
-
-                    CompletableFuture<LocatorEntry> zkFuture = new CompletableFuture<>();
-
-                    ZkUtils.asyncCreateFullPathOptimistic(zooKeeper, schema, locator.toByteArray(),
Acl,
-                            CreateMode.PERSISTENT, (rc, path, ctx, name) -> {
-                                Code code = Code.get(rc);
-                                if (code != Code.OK) {
-                                    zkFuture.completeExceptionally(KeeperException.create(code));
-                                } else {
-                                    zkFuture.complete(new LocatorEntry(locator, -1));
-                                }
-                            }, null);
-
-                    return zkFuture;
-                }
-            }).handleAsync((res, ex) -> {
-                // Cleanup the pending ops from the map
-                locatorEntries.remove(schema, future);
-                if (ex != null) {
-                    future.completeExceptionally(ex);
-                } else {
-                    future.complete(res);
-                }
-                return null;
-            });
-
-            return future;
-        });
-    }
-
-    @NotNull
     private CompletableFuture<Long> addEntry(LedgerHandle ledgerHandle, SchemaStorageFormat.SchemaEntry
entry) {
         final CompletableFuture<Long> future = new CompletableFuture<>();
         ledgerHandle.asyncAddEntry(entry.toByteArray(),
             (rc, handle, entryId, ctx) -> {
                 if (rc != BKException.Code.OK) {
-                    future.completeExceptionally(BKException.create(rc));
+                    future.completeExceptionally(bkException("Failed to add entry", rc, ledgerHandle.getId(),
-1));
                 } else {
                     future.complete(entryId);
                 }
@@ -420,7 +507,7 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
             LedgerPassword,
             (rc, handle, ctx) -> {
                 if (rc != BKException.Code.OK) {
-                    future.completeExceptionally(BKException.create(rc));
+                    future.completeExceptionally(bkException("Failed to create ledger", rc,
-1, -1));
                 } else {
                     future.complete(handle);
                 }
@@ -438,7 +525,7 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
             LedgerPassword,
             (rc, handle, ctx) -> {
                 if (rc != BKException.Code.OK) {
-                    future.completeExceptionally(BKException.create(rc));
+                    future.completeExceptionally(bkException("Failed to open ledger", rc,
ledgerId, -1));
                 } else {
                     future.complete(handle);
                 }
@@ -452,7 +539,7 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
         CompletableFuture<Void> future = new CompletableFuture<>();
         ledgerHandle.asyncClose((rc, handle, ctx) -> {
             if (rc != BKException.Code.OK) {
-                future.completeExceptionally(BKException.create(rc));
+                future.completeExceptionally(bkException("Failed to close ledger", rc, ledgerHandle.getId(),
-1));
             } else {
                 future.complete(null);
             }
@@ -466,7 +553,7 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
             ledger.asyncReadEntries(entry, entry,
                 (rc, handle, entries, ctx) -> {
                     if (rc != BKException.Code.OK) {
-                        future.completeExceptionally(BKException.create(rc));
+                        future.completeExceptionally(bkException("Failed to read entry",
rc, ledger.getId(), entry));
                     } else {
                         future.complete(entries.nextElement());
                     }
@@ -519,4 +606,13 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
             this.zkZnodeVersion = zkZnodeVersion;
         }
     }
+
+    public static Exception bkException(String operation, int rc, long ledgerId, long entryId)
{
+        String message = org.apache.bookkeeper.client.api.BKException.getMessage(rc) + "
-  ledger=" + ledgerId;
+
+        if (entryId != -1) {
+            message += " - entry=" + entryId;
+        }
+        return new IOException(message);
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/PartitionedTopicsSchemaTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/PartitionedTopicsSchemaTest.java
index e7723d7..6e9b122 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/PartitionedTopicsSchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/PartitionedTopicsSchemaTest.java
@@ -38,10 +38,8 @@ public class PartitionedTopicsSchemaTest extends BrokerBkEnsemblesTests
{
 
     /**
      * Test that sequence id from a producer is correct when there are send errors
-     *
-     * the test is disabled {@link https://github.com/apache/pulsar/issues/2651}
      */
-    @Test(enabled = false)
+    @Test
     public void partitionedTopicWithSchema() throws Exception {
         admin.namespaces().createNamespace("prop/my-test", Collections.singleton("usc"));
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index a6f40fe..3b87e4e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -126,15 +126,15 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T>
{
                 if (completed.incrementAndGet() == topicMetadata.numPartitions()) {
                     if (createFail.get() == null) {
                         setState(State.Ready);
-                        producerCreatedFuture().complete(PartitionedProducerImpl.this);
                         log.info("[{}] Created partitioned producer", topic);
+                        producerCreatedFuture().complete(PartitionedProducerImpl.this);
                     } else {
+                        log.error("[{}] Could not create partitioned producer.", topic, createFail.get().getCause());
                         closeAsync().handle((ok, closeException) -> {
                             producerCreatedFuture().completeExceptionally(createFail.get());
                             client.cleanupProducer(this);
                             return null;
                         });
-                        log.error("[{}] Could not create partitioned producer.", topic, createFail.get().getCause());
                     }
                 }
 


Mime
View raw message