pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [pulsar] branch master updated: [Issue 3458: Tag Pulsar ledgers in order to distinguish from other leggers in the same Bookkeeper cluster (#3525)
Date Wed, 13 Feb 2019 14:52:17 GMT
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4338052  [Issue 3458: Tag Pulsar ledgers in order to distinguish from other leggers
in the same Bookkeeper cluster (#3525)
4338052 is described below

commit 43380523c5269c152f61b2aa8f7b70281c770d1d
Author: Enrico Olivelli <eolivelli@gmail.com>
AuthorDate: Wed Feb 13 15:52:11 2019 +0100

    [Issue 3458: Tag Pulsar ledgers in order to distinguish from other leggers in the same
Bookkeeper cluster (#3525)
    
    Fixes #3458
    
    ### Motivation
    
    See #3458
    
    ### Modifications
    
    Add a new LedgerMetadataUtils class which holds the logic for building "metadata" to be
attached to
    
    ### Verifying this change
    
    This change is a trivial rework / code cleanup without any test coverage.
---
 .../mledger/impl/LedgerMetadataUtils.java          | 104 +++++++++++++++++++++
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |   5 +-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  20 ++--
 .../service/schema/BookkeeperSchemaStorage.java    |  17 ++--
 .../pulsar/compaction/TwoPhaseCompactor.java       |   8 +-
 site2/docs/cookbooks-bookkeepermetadata.md         |  20 ++++
 site2/website/sidebars.json                        |   3 +-
 7 files changed, 157 insertions(+), 20 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java
new file mode 100644
index 0000000..3a245d1
--- /dev/null
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import com.google.common.collect.ImmutableMap;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+/**
+ * Utilities for managing BookKeeper Ledgers custom metadata.
+ */
+public final class LedgerMetadataUtils {
+
+    private static final String METADATA_PROPERTY_APPLICATION = "application";
+    private static final byte[] METADATA_PROPERTY_APPLICATION_PULSAR
+            = "pulsar".getBytes(StandardCharsets.UTF_8);
+
+    private static final String METADATA_PROPERTY_COMPONENT = "component";
+    private static final byte[] METADATA_PROPERTY_COMPONENT_MANAGED_LEDGER
+            = "managed-ledger".getBytes(StandardCharsets.UTF_8);
+    private static final byte[] METADATA_PROPERTY_COMPONENT_COMPACTED_LEDGER
+            = "compacted-ledger".getBytes(StandardCharsets.UTF_8);
+    private static final byte[] METADATA_PROPERTY_COMPONENT_SCHEMA
+            = "schema".getBytes(StandardCharsets.UTF_8);
+
+    private static final String METADATA_PROPERTY_MANAGED_LEDGER_NAME = "pulsar/managed-ledger";
+    private static final String METADATA_PROPERTY_CURSOR_NAME = "pulsar/cursor";
+    private static final String METADATA_PROPERTY_COMPACTEDTOPIC = "pulsar/compactedTopic";
+    private static final String METADATA_PROPERTY_COMPACTEDTO = "pulsar/compactedTo";
+    private static final String METADATA_PROPERTY_SCHEMAID = "pulsar/schemaId";
+
+    /**
+     * Build base metadata for every ManagedLedger.
+     *
+     * @param name the name of the ledger
+     * @return an immutable map which describes a ManagedLedger
+     */
+    static Map<String, byte[]> buildBaseManagedLedgerMetadata(String name) {
+        return ImmutableMap.of(
+                METADATA_PROPERTY_APPLICATION, METADATA_PROPERTY_APPLICATION_PULSAR,
+                METADATA_PROPERTY_COMPONENT, METADATA_PROPERTY_COMPONENT_MANAGED_LEDGER,
+                METADATA_PROPERTY_MANAGED_LEDGER_NAME, name.getBytes(StandardCharsets.UTF_8));
+    }
+
+    /**
+     * Build additional metadata for a Cursor.
+     *
+     * @param name the name of the cursor
+     * @return an immutable map which describes the cursor
+     * @see #buildBaseManagedLedgerMetadata(java.lang.String)
+     */
+    static Map<String, byte[]> buildAdditionalMetadataForCursor(String name) {
+        return ImmutableMap.of(METADATA_PROPERTY_CURSOR_NAME, name.getBytes(StandardCharsets.UTF_8));
+    }
+
+    /**
+     * Build additional metadata for a CompactedLedger.
+     *
+     * @param compactedTopic reference to the compacted topic.
+     * @param compactedToMessageId last mesasgeId.
+     * @return an immutable map which describes the compacted ledger
+     */
+    public static Map<String, byte[]> buildMetadataForCompactedLedger(String compactedTopic,
byte[] compactedToMessageId) {
+        return ImmutableMap.of(
+                METADATA_PROPERTY_APPLICATION, METADATA_PROPERTY_APPLICATION_PULSAR,
+                METADATA_PROPERTY_COMPONENT, METADATA_PROPERTY_COMPONENT_COMPACTED_LEDGER,
+                METADATA_PROPERTY_COMPACTEDTOPIC, compactedTopic.getBytes(StandardCharsets.UTF_8),
+                METADATA_PROPERTY_COMPACTEDTO, compactedToMessageId
+        );
+    }
+
+    /**
+     * Build additional metadata for a Schema
+     *
+     * @param schemaId id of the schema
+     * @return an immutable map which describes the schema
+     */
+    public static Map<String, byte[]> buildMetadataForSchema(String schemaId) {
+        return ImmutableMap.of(
+                METADATA_PROPERTY_APPLICATION, METADATA_PROPERTY_APPLICATION_PULSAR,
+                METADATA_PROPERTY_COMPONENT, METADATA_PROPERTY_COMPONENT_SCHEMA,
+                METADATA_PROPERTY_SCHEMAID, schemaId.getBytes(StandardCharsets.UTF_8)
+        );
+    }
+
+    private LedgerMetadataUtils() {}
+
+}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 01b3a9c..1d5fe67 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -29,6 +29,7 @@ import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Collections2;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Range;
@@ -37,6 +38,7 @@ import com.google.common.collect.Sets;
 import com.google.common.collect.TreeRangeSet;
 import com.google.common.util.concurrent.RateLimiter;
 import com.google.protobuf.InvalidProtocolBufferException;
+import java.nio.charset.StandardCharsets;
 
 import java.time.Clock;
 import java.util.ArrayDeque;
@@ -2015,7 +2017,6 @@ public class ManagedCursorImpl implements ManagedCursor {
 
     void createNewMetadataLedger(final VoidCallback callback) {
         ledger.mbean.startCursorLedgerCreateOp();
-
         ledger.asyncCreateLedger(bookkeeper, config, digestType, (rc, lh, ctx) -> {
 
             if (ledger.checkAndCompleteLedgerOpTask(rc, lh, ctx)) {
@@ -2081,7 +2082,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                     }
                 });
             }));
-        }, Collections.emptyMap());
+        }, LedgerMetadataUtils.buildAdditionalMetadataForCursor(name));
 
     }
 
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 26b3acc..e1376f9 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -20,7 +20,6 @@ package org.apache.bookkeeper.mledger.impl;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.lang.Math.min;
-import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.FALSE;
 import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 
 import java.time.Clock;
@@ -106,7 +105,6 @@ import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.BoundType;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
@@ -119,6 +117,8 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
 import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.TRUE;
 import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.FALSE;
 
@@ -131,6 +131,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback
{
 
     protected final BookKeeper bookKeeper;
     protected final String name;
+    private final Map<String, byte[]> ledgerMetadata;
     private final BookKeeper.DigestType digestType;
 
     protected ManagedLedgerConfig config;
@@ -249,6 +250,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback
{
         this.config = config;
         this.store = store;
         this.name = name;
+        this.ledgerMetadata = LedgerMetadataUtils.buildBaseManagedLedgerMetadata(name);
         this.digestType = BookKeeper.DigestType.fromApiDigestType(config.getDigestType());
         this.scheduledExecutor = scheduledExecutor;
         this.executor = orderedExecutor;
@@ -439,7 +441,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback
{
                 // Save it back to ensure all nodes exist
                 store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, storeLedgersCb);
             }));
-        }, Collections.emptyMap());
+        }, ledgerMetadata);
     }
 
     private void initializeCursors(final ManagedLedgerInitializeLedgerCallback callback)
{
@@ -3013,13 +3015,19 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback
{
      * @param config
      * @param digestType
      * @param cb
-     * @param emptyMap
+     * @param metadata
      */
     protected void asyncCreateLedger(BookKeeper bookKeeper, ManagedLedgerConfig config, DigestType
digestType,
-            CreateCallback cb, Map<Object, Object> emptyMap) {
+            CreateCallback cb, Map<String, byte[]> metadata) {
         AtomicBoolean ledgerCreated = new AtomicBoolean(false);
+        Map<String, byte[]> finalMetadata = new HashMap<>();
+        finalMetadata.putAll(ledgerMetadata);
+        finalMetadata.putAll(metadata);
+        if (log.isDebugEnabled()) {
+            log.debug("creating ledger, metadata: "+finalMetadata);
+        }
         bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(),
config.getAckQuorumSize(),
-                digestType, config.getPassword(), cb, ledgerCreated, Collections.emptyMap());
+                digestType, config.getPassword(), cb, ledgerCreated, finalMetadata);
         scheduledExecutor.schedule(() -> {
             if (!ledgerCreated.get()) {
                 ledgerCreated.set(true);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index f0e9699..47fcb62 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -27,13 +27,13 @@ import static java.util.concurrent.CompletableFuture.completedFuture;
 import static org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.Functions.newSchemaEntry;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.ByteString;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -45,6 +45,7 @@ import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -210,7 +211,7 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
         return getSchemaLocator(getSchemaPath(schemaId)).thenCompose(optLocatorEntry ->
{
             if (optLocatorEntry.isPresent()) {
                 // Schema locator was already present
-                return addNewSchemaEntryToStore(optLocatorEntry.get().locator.getIndexList(),
data)
+                return addNewSchemaEntryToStore(schemaId, optLocatorEntry.get().locator.getIndexList(),
data)
                         .thenCompose(position -> updateSchemaLocator(schemaId, optLocatorEntry.get(),
position, hash));
             } else {
                 // No schema was defined yet
@@ -259,7 +260,7 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
 
                 return findSchemaEntryByHash(locator.getIndexList(), hash).thenCompose(version
-> {
                     if (isNull(version)) {
-                        return addNewSchemaEntryToStore(locator.getIndexList(), data).thenCompose(
+                        return addNewSchemaEntryToStore(schemaId, locator.getIndexList(),
data).thenCompose(
                                 position -> updateSchemaLocator(schemaId, optLocatorEntry.get(),
position, hash));
                     } else {
                         return completedFuture(version);
@@ -303,7 +304,7 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
                                 .setLedgerId(-1L)
                         ).build();
 
-        return addNewSchemaEntryToStore(Collections.singletonList(emptyIndex), data).thenCompose(position
-> {
+        return addNewSchemaEntryToStore(schemaId, Collections.singletonList(emptyIndex),
data).thenCompose(position -> {
             // The schema was stored in the ledger, now update the z-node with the pointer
to it
             SchemaStorageFormat.IndexEntry info = SchemaStorageFormat.IndexEntry.newBuilder()
                     .setVersion(0)
@@ -338,11 +339,12 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
 
     @NotNull
     private CompletableFuture<SchemaStorageFormat.PositionInfo> addNewSchemaEntryToStore(
+        String schemaId,
         List<SchemaStorageFormat.IndexEntry> index,
         byte[] data
     ) {
         SchemaStorageFormat.SchemaEntry schemaEntry = newSchemaEntry(index, data);
-        return createLedger().thenCompose(ledgerHandle ->
+        return createLedger(schemaId).thenCompose(ledgerHandle ->
             addEntry(ledgerHandle, schemaEntry).thenApply(entryId ->
                 Functions.newPositionInfo(ledgerHandle.getId(), entryId)
             )
@@ -497,7 +499,8 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
     }
 
     @NotNull
-    private CompletableFuture<LedgerHandle> createLedger() {
+    private CompletableFuture<LedgerHandle> createLedger(String schemaId) {
+        Map<String, byte[]> metadata = LedgerMetadataUtils.buildMetadataForSchema(schemaId);
         final CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
         bookKeeper.asyncCreateLedger(
             config.getManagedLedgerDefaultEnsembleSize(),
@@ -511,7 +514,7 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
                 } else {
                     future.complete(handle);
                 }
-            }, null, Collections.emptyMap()
+            }, null, metadata
         );
         return future;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index d7e9bb1..b070f3b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -39,6 +39,7 @@ import java.util.concurrent.TimeoutException;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.client.api.MessageId;
@@ -129,7 +130,7 @@ public class TwoPhaseCompactor extends Compactor {
                             Pair<String,Integer> keyAndSize = extractKeyAndSize(m);
                             if (keyAndSize != null) {
                                 if(keyAndSize.getRight() > 0) {
-                                    latestForKey.put(keyAndSize.getLeft(), id);    
+                                    latestForKey.put(keyAndSize.getLeft(), id);
                                 } else {
                                     deletedMessage = true;
                                     latestForKey.remove(keyAndSize.getLeft());
@@ -165,8 +166,7 @@ public class TwoPhaseCompactor extends Compactor {
 
     private CompletableFuture<Long> phaseTwo(RawReader reader, MessageId from, MessageId
to, MessageId lastReadId,
             Map<String, MessageId> latestForKey, BookKeeper bk) {
-        Map<String, byte[]> metadata = ImmutableMap.of("compactedTopic", reader.getTopic().getBytes(UTF_8),
-                "compactedTo", to.toByteArray());
+        Map<String, byte[]> metadata = LedgerMetadataUtils.buildMetadataForCompactedLedger(reader.getTopic(),
to.toByteArray());
         return createLedger(bk, metadata).thenCompose((ledger) -> {
             log.info("Commencing phase two of compaction for {}, from {} to {}, compacting
{} keys to ledger {}",
                     reader.getTopic(), from, to, latestForKey.size(), ledger.getId());
@@ -228,7 +228,7 @@ public class TwoPhaseCompactor extends Compactor {
                         MessageId msg;
                         if (keyAndSize == null) { // pass through messages without a key
                             messageToAdd = Optional.of(m);
-                        } else if ((msg = latestForKey.get(keyAndSize.getLeft())) != null

+                        } else if ((msg = latestForKey.get(keyAndSize.getLeft())) != null
                                 && msg.equals(id)) { // consider message only if
present into latestForKey map
                             if (keyAndSize.getRight() <= 0) {
                                 promise.completeExceptionally(new IllegalArgumentException(
diff --git a/site2/docs/cookbooks-bookkeepermetadata.md b/site2/docs/cookbooks-bookkeepermetadata.md
new file mode 100644
index 0000000..187cb65
--- /dev/null
+++ b/site2/docs/cookbooks-bookkeepermetadata.md
@@ -0,0 +1,20 @@
+---
+id: cookbooks-bookkeepermetadata
+title: BookKeeper Ledger Metadata
+---
+
+Pulsar stores data on BookKeeper ledgers, you can understand the contents of a ledger by
inspecting the metadata attached to the ledger.
+Such metadata are stored on ZooKeeper and they are readable using BookKeeper APIs.
+
+Description of current metadata:
+
+| Scope  | Metadata name | Metadata value |
+| ------------- | ------------- | ------------- |
+| All ledgers  | application  | 'pulsar' |
+| All ledgers  | component  | 'managed-ledger', 'schema', 'compacted-topic' |
+| Managed ledgers | pulsar/managed-ledger | name of the ledger |
+| Cursor | pulsar/cursor | name of the cursor |
+| Compacted topic | pulsar/compactedTopic | name of the original topic |
+| Compacted topic | pulsar/compactedTo | id of the last compacted message |
+
+
diff --git a/site2/website/sidebars.json b/site2/website/sidebars.json
index 12a2711..34d4fa9 100644
--- a/site2/website/sidebars.json
+++ b/site2/website/sidebars.json
@@ -99,7 +99,8 @@
       "cookbooks-partitioned",
       "cookbooks-retention-expiry",
       "cookbooks-encryption",
-      "cookbooks-message-queue"
+      "cookbooks-message-queue",
+      "cookbooks-bookkeepermetadata"
     ],
     "Development": [
       "develop-tools",


Mime
View raw message