pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [pulsar] branch master updated: [bookkeeper] bump bookkeeper version to 4.9.0 (#2906)
Date Sun, 10 Feb 2019 13:33:54 GMT
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b44aaa  [bookkeeper] bump bookkeeper version to 4.9.0 (#2906)
1b44aaa is described below

commit 1b44aaa78743f8dce68ffe31c25619123c9a5a5f
Author: Sijie Guo <guosijie@gmail.com>
AuthorDate: Sun Feb 10 21:33:49 2019 +0800

    [bookkeeper] bump bookkeeper version to 4.9.0 (#2906)
    
    *Motivation*
    
    This change was part of #2714. Moving it out as a separate change to make reviews easier.
    
    *Changes*
    
    Bump bookkeeper version to 4.9.0
---
 distribution/server/src/assemble/LICENSE.bin.txt   | 83 ++++++++++++----------
 distribution/server/src/assemble/NOTICE.bin.txt    | 15 ++++
 .../apache/bookkeeper/mledger/impl/OpAddEntry.java |  7 +-
 .../bookkeeper/client/PulsarMockBookKeeper.java    |  7 +-
 .../bookkeeper/client/PulsarMockLedgerHandle.java  | 29 +++++++-
 .../mledger/impl/ManagedLedgerBkTest.java          |  6 +-
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java |  1 -
 .../mledger/impl/OffloadPrefixReadTest.java        | 25 ++++++-
 .../bookkeeper/test/BookKeeperClusterTestCase.java | 10 ++-
 pom.xml                                            | 62 +++++++++++++---
 .../org/apache/pulsar/PulsarBrokerStarter.java     |  2 +-
 .../pulsar/broker/BookKeeperClientFactoryImpl.java |  3 +
 .../pulsar/compaction/TwoPhaseCompactor.java       |  3 +-
 .../broker/auth/SameThreadOrderedSafeExecutor.java | 11 ++-
 .../pulsar/compaction/CompactedTopicTest.java      |  1 -
 pulsar-functions/instance/pom.xml                  | 12 ++++
 pulsar-functions/runtime-all/pom.xml               | 10 +++
 pulsar-functions/runtime/pom.xml                   |  5 --
 pulsar-zookeeper-utils/pom.xml                     | 12 ++++
 .../ZkIsolatedBookieEnsemblePlacementPolicy.java   |  6 +-
 ...kIsolatedBookieEnsemblePlacementPolicyTest.java | 22 +++---
 src/check-binary-license                           |  3 +
 .../offload/jcloud/impl/OffloadIndexBlockImpl.java | 22 +++++-
 .../impl/BlobStoreManagedLedgerOffloaderTest.java  |  6 --
 .../offload/jcloud/impl/OffloadIndexTest.java      | 31 ++------
 25 files changed, 274 insertions(+), 120 deletions(-)

diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index 044a66f..fe82eee 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -323,7 +323,7 @@ The Apache Software License, Version 2.0
      - com.fasterxml.jackson.module-jackson-module-jaxb-annotations-2.9.7.jar
      - com.fasterxml.jackson.module-jackson-module-jsonSchema-2.9.7.jar
  * Caffeine -- com.github.ben-manes.caffeine-caffeine-2.6.2.jar
- * Proto Google Common Protos -- com.google.api.grpc-proto-google-common-protos-1.0.0.jar
+ * Proto Google Common Protos -- com.google.api.grpc-proto-google-common-protos-1.12.0.jar
  * Gson -- com.google.code.gson-gson-2.8.2.jar
  * Guava -- com.google.guava-guava-21.0.jar
  * Netty Reactive Streams -- com.typesafe.netty-netty-reactive-streams-2.0.0.jar
@@ -350,8 +350,8 @@ The Apache Software License, Version 2.0
     - org.apache.commons-commons-lang3-3.4.jar
  * Netty
     - io.netty-netty-3.10.1.Final.jar
-    - io.netty-netty-all-4.1.22.Final.jar
-    - io.netty-netty-tcnative-boringssl-static-2.0.7.Final.jar
+    - io.netty-netty-all-4.1.32.Final.jar
+    - io.netty-netty-tcnative-boringssl-static-2.0.20.Final.jar
  * Prometheus client
     - io.prometheus-simpleclient-0.5.0.jar
     - io.prometheus-simpleclient_common-0.5.0.jar
@@ -368,29 +368,32 @@ The Apache Software License, Version 2.0
     - org.apache.logging.log4j-log4j-web-2.10.0.jar
  * Java Native Access JNA -- net.java.dev.jna-jna-4.2.0.jar
  * BookKeeper
-    - org.apache.bookkeeper-bookkeeper-common-4.7.3.jar
-    - org.apache.bookkeeper-bookkeeper-proto-4.7.3.jar
-    - org.apache.bookkeeper-bookkeeper-server-4.7.3.jar
-    - org.apache.bookkeeper-circe-checksum-4.7.3.jar
-    - org.apache.bookkeeper-statelib-4.7.3.jar
-    - org.apache.bookkeeper-stream-storage-api-4.7.3.jar
-    - org.apache.bookkeeper-stream-storage-common-4.7.3.jar
-    - org.apache.bookkeeper-stream-storage-java-client-4.7.3.jar
-    - org.apache.bookkeeper-stream-storage-java-client-base-4.7.3.jar
-    - org.apache.bookkeeper-stream-storage-proto-4.7.3.jar
-    - org.apache.bookkeeper-stream-storage-server-4.7.3.jar
-    - org.apache.bookkeeper-stream-storage-service-api-4.7.3.jar
-    - org.apache.bookkeeper-stream-storage-service-impl-4.7.3.jar
-    - org.apache.bookkeeper.http-http-server-4.7.3.jar
-    - org.apache.bookkeeper.http-vertx-http-server-4.7.3.jar
-    - org.apache.bookkeeper.stats-bookkeeper-stats-api-4.7.3.jar
-    - org.apache.bookkeeper.stats-prometheus-metrics-provider-4.7.3.jar
-    - org.apache.bookkeeper.tests-stream-storage-tests-common-4.7.3.jar
-    - org.apache.distributedlog-distributedlog-common-4.7.3.jar
-    - org.apache.distributedlog-distributedlog-core-4.7.3-tests.jar
-    - org.apache.distributedlog-distributedlog-core-4.7.3.jar
-    - org.apache.distributedlog-distributedlog-protocol-4.7.3.jar
-    - org.apache.bookkeeper.stats-codahale-metrics-provider-4.7.3.jar
+    - org.apache.bookkeeper-bookkeeper-common-4.9.0.jar
+    - org.apache.bookkeeper-bookkeeper-common-allocator-4.9.0.jar
+    - org.apache.bookkeeper-bookkeeper-proto-4.9.0.jar
+    - org.apache.bookkeeper-bookkeeper-server-4.9.0.jar
+    - org.apache.bookkeeper-bookkeeper-tools-framework-4.9.0.jar
+    - org.apache.bookkeeper-circe-checksum-4.9.0.jar
+    - org.apache.bookkeeper-cpu-affinity-4.9.0.jar
+    - org.apache.bookkeeper-statelib-4.9.0.jar
+    - org.apache.bookkeeper-stream-storage-api-4.9.0.jar
+    - org.apache.bookkeeper-stream-storage-common-4.9.0.jar
+    - org.apache.bookkeeper-stream-storage-java-client-4.9.0.jar
+    - org.apache.bookkeeper-stream-storage-java-client-base-4.9.0.jar
+    - org.apache.bookkeeper-stream-storage-proto-4.9.0.jar
+    - org.apache.bookkeeper-stream-storage-server-4.9.0.jar
+    - org.apache.bookkeeper-stream-storage-service-api-4.9.0.jar
+    - org.apache.bookkeeper-stream-storage-service-impl-4.9.0.jar
+    - org.apache.bookkeeper.http-http-server-4.9.0.jar
+    - org.apache.bookkeeper.http-vertx-http-server-4.9.0.jar
+    - org.apache.bookkeeper.stats-bookkeeper-stats-api-4.9.0.jar
+    - org.apache.bookkeeper.stats-prometheus-metrics-provider-4.9.0.jar
+    - org.apache.bookkeeper.tests-stream-storage-tests-common-4.9.0.jar
+    - org.apache.distributedlog-distributedlog-common-4.9.0.jar
+    - org.apache.distributedlog-distributedlog-core-4.9.0-tests.jar
+    - org.apache.distributedlog-distributedlog-core-4.9.0.jar
+    - org.apache.distributedlog-distributedlog-protocol-4.9.0.jar
+    - org.apache.bookkeeper.stats-codahale-metrics-provider-4.9.0.jar
  * LZ4 -- org.lz4-lz4-java-1.5.0.jar
  * AsyncHttpClient
     - org.asynchttpclient-async-http-client-2.7.0.jar
@@ -423,20 +426,20 @@ The Apache Software License, Version 2.0
  * Okio - com.squareup.okio-okio-1.13.0.jar
  * Javassist -- org.javassist-javassist-3.21.0-GA.jar
   * gRPC
-    - io.grpc-grpc-all-1.16.1.jar
-    - io.grpc-grpc-auth-1.16.1.jar
-    - io.grpc-grpc-context-1.16.1.jar
-    - io.grpc-grpc-core-1.16.1.jar
-    - io.grpc-grpc-netty-1.16.1.jar
-    - io.grpc-grpc-okhttp-1.16.1.jar
-    - io.grpc-grpc-protobuf-1.16.1.jar
-    - io.grpc-grpc-protobuf-lite-1.16.1.jar
-    - io.grpc-grpc-protobuf-nano-1.16.1.jar
-    - io.grpc-grpc-stub-1.16.1.jar
+    - io.grpc-grpc-all-1.18.0.jar
+    - io.grpc-grpc-auth-1.18.0.jar
+    - io.grpc-grpc-context-1.18.0.jar
+    - io.grpc-grpc-core-1.18.0.jar
+    - io.grpc-grpc-netty-1.18.0.jar
+    - io.grpc-grpc-okhttp-1.18.0.jar
+    - io.grpc-grpc-protobuf-1.18.0.jar
+    - io.grpc-grpc-protobuf-lite-1.18.0.jar
+    - io.grpc-grpc-protobuf-nano-1.18.0.jar
+    - io.grpc-grpc-stub-1.18.0.jar
     - io.grpc-grpc-testing-1.12.0.jar
   * OpenCensus
-    - io.opencensus-opencensus-api-0.12.3.jar
-    - io.opencensus-opencensus-contrib-grpc-metrics-0.12.3.jar
+    - io.opencensus-opencensus-api-0.18.0.jar
+    - io.opencensus-opencensus-contrib-grpc-metrics-0.18.0.jar
   * Paranamer
     - com.thoughtworks.paranamer-paranamer-2.7.jar
   * Jodah
@@ -477,6 +480,8 @@ The Apache Software License, Version 2.0
     - io.jsonwebtoken-jjwt-jackson-0.10.5.jar
   * JavaX Injection
     - javax.inject-javax.inject-1.jar
+  * JCTools - Java Concurrency Tools for the JVM
+    - org.jctools-jctools-core-2.1.2.jar
   * Vertx
     - io.vertx-vertx-auth-common-3.4.1.jar 
     - io.vertx-vertx-core-3.4.1.jar
@@ -501,6 +506,8 @@ MIT License
     - org.slf4j-jcl-over-slf4j-1.7.25.jar
  * Animal Sniffer Annotations
     - org.codehaus.mojo-animal-sniffer-annotations-1.17.jar
+ * The Checker Framework
+    - org.checkerframework-checker-compat-qual-2.5.2.jar 
 
 Protocol Buffers License
  * Protocol Buffers
diff --git a/distribution/server/src/assemble/NOTICE.bin.txt b/distribution/server/src/assemble/NOTICE.bin.txt
index 1402dc5..c709968 100644
--- a/distribution/server/src/assemble/NOTICE.bin.txt
+++ b/distribution/server/src/assemble/NOTICE.bin.txt
@@ -200,3 +200,18 @@ http://gcc.gnu.org/onlinedocs/libstdc++/manual/license.html
     * Providing benchmark suite
   * Alec Wysoker
     * Performance and memory usage improvement
+
+------------------------------------------------------------------------------------
+- org.checkerframework-checker-compat-qual-2.5.2.jar
+
+A few parts of the Checker Framework have more permissive licenses.
+
+ * The annotations are licensed under the MIT License.  (The text of this
+   license appears below.)  More specifically, all the parts of the Checker
+   Framework that you might want to include with your own program use the
+   MIT License.  This is the checker-qual.jar file and all the files that
+   appear in it:  every file in a qual/ directory, plus utility files such
+   as NullnessUtil.java, RegexUtil.java, SignednessUtil.java, etc.
+   In addition, the cleanroom implementations of third-party annotations,
+   which the Checker Framework recognizes as aliases for its own
+   annotations, are licensed under the MIT License.
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index 0cef300..c86f7b9 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -95,15 +95,10 @@ class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback {
 
     public void initiate() {
         ByteBuf duplicateBuffer = data.retainedDuplicate();
-        // duplicatedBuffer has refCnt=1 at this point
 
+        // internally asyncAddEntry() will take the ownership of the buffer and release it at the end
         lastInitTime = System.nanoTime();
         ledger.asyncAddEntry(duplicateBuffer, this, ctx);
-
-        // Internally, asyncAddEntry() is refCnt neutral to respect to the passed buffer and it will keep a ref on it
-        // until is done using it. We need to release this buffer here to balance the 1 refCnt added at the creation
-        // time.
-        duplicateBuffer.release();
     }
 
     public void failed(ManagedLedgerException e) {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
index fa49e57..f7e9cd8 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
@@ -33,11 +33,13 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
 import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
 import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
+import org.apache.bookkeeper.client.BKException.Code;
 import org.apache.bookkeeper.client.api.OpenBuilder;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.client.impl.OpenBuilderBase;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -198,8 +200,9 @@ public class PulsarMockBookKeeper extends BookKeeper {
             public CompletableFuture<ReadHandle> execute() {
                 return getProgrammedFailure().thenCompose(
                         (res) -> {
-                            if (!validate()) {
-                                return FutureUtils.exception(new BKException.BKNoSuchLedgerExistsException());
+                            int rc = validate();
+                            if (rc != BKException.Code.OK) {
+                                return FutureUtils.exception(BKException.create(rc));
                             }
 
                             PulsarMockLedgerHandle lh = ledgers.get(ledgerId);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
index 0c6535d..5159851 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
@@ -29,6 +29,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.Enumeration;
+import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 
@@ -38,10 +39,14 @@ import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
 import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.client.api.WriteFlag;
 import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.versioning.LongVersion;
+import org.apache.bookkeeper.versioning.Versioned;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,8 +66,8 @@ public class PulsarMockLedgerHandle extends LedgerHandle {
 
     public PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id,
                            DigestType digest, byte[] passwd) throws GeneralSecurityException {
-        super(bk, id, new LedgerMetadata(3, 3, 2, DigestType.MAC, "".getBytes()), DigestType.MAC, "".getBytes(),
-                EnumSet.noneOf(WriteFlag.class));
+        super(bk.getClientCtx(), id, new Versioned<>(createMetadata(digest, passwd), new LongVersion(0L)),
+              digest, passwd, WriteFlag.NONE);
         this.bk = bk;
         this.id = id;
         this.digest = digest;
@@ -75,6 +80,14 @@ public class PulsarMockLedgerHandle extends LedgerHandle {
     public void asyncClose(CloseCallback cb, Object ctx) {
         bk.getProgrammedFailure().thenComposeAsync((res) -> {
                 fenced = true;
+
+                Versioned<LedgerMetadata> current = getVersionedLedgerMetadata();
+                Versioned<LedgerMetadata> newMetadata = new Versioned<>(
+                        LedgerMetadataBuilder.from(current.getValue())
+                        .withClosedState().withLastEntryId(getLastAddConfirmed())
+                        .withLength(getLength()).build(),
+                        new LongVersion(((LongVersion)current.getVersion()).getLongVersion() + 1));
+                setLedgerMetadata(current, newMetadata);
                 return FutureUtils.value(null);
             }, bk.executor).whenCompleteAsync((res, exception) -> {
                     if (exception != null) {
@@ -242,6 +255,18 @@ public class PulsarMockLedgerHandle extends LedgerHandle {
         return readHandle.readLastAddConfirmedAndEntryAsync(entryId, timeOutInMillis, parallel);
     }
 
+    private static LedgerMetadata createMetadata(DigestType digest, byte[] passwd) {
+        List<BookieSocketAddress> ensemble = Lists.newArrayList(
+                new BookieSocketAddress("192.0.2.1", 1234),
+                new BookieSocketAddress("192.0.2.2", 1234),
+                new BookieSocketAddress("192.0.2.3", 1234));
+        return LedgerMetadataBuilder.create()
+            .withDigestType(digest.toApiDigestType())
+            .withPassword(passwd)
+            .newEnsembleEntry(0L, ensemble)
+            .build();
+    }
+
     private static final Logger log = LoggerFactory.getLogger(PulsarMockLedgerHandle.class);
 
 }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
index a5b58d7..8d9cf8a 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
@@ -223,7 +223,7 @@ public class ManagedLedgerBkTest extends BookKeeperClusterTestCase {
     public void testSimple() throws Exception {
         ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, zkc);
         ManagedLedgerConfig mlConfig = new ManagedLedgerConfig();
-        mlConfig.setEnsembleSize(1).setAckQuorumSize(1).setMetadataEnsembleSize(1);
+        mlConfig.setEnsembleSize(1).setAckQuorumSize(1).setMetadataEnsembleSize(1).setWriteQuorumSize(1);
         // set the data ledger size
         mlConfig.setMaxEntriesPerLedger(100);
         // set the metadata ledger size to 1 to kick off many ledger switching cases
@@ -238,7 +238,9 @@ public class ManagedLedgerBkTest extends BookKeeperClusterTestCase {
     public void testConcurrentMarkDelete() throws Exception {
         ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, zkc);
         ManagedLedgerConfig mlConfig = new ManagedLedgerConfig();
-        mlConfig.setEnsembleSize(1).setAckQuorumSize(1).setMetadataEnsembleSize(1).setMetadataAckQuorumSize(1);
+        mlConfig.setEnsembleSize(1).setWriteQuorumSize(1)
+            .setAckQuorumSize(1).setMetadataEnsembleSize(1).setMetadataWriteQuorumSize(1)
+            .setMetadataAckQuorumSize(1);
         // set the data ledger size
         mlConfig.setMaxEntriesPerLedger(100);
         // set the metadata ledger size to 1 to kick off many ledger switching cases
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 6ed13a5..497ae9e 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -62,7 +62,6 @@ import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.api.LedgerEntries;
 import org.apache.bookkeeper.client.api.ReadHandle;
-import org.apache.bookkeeper.client.LedgerMetadata;
 import org.apache.bookkeeper.client.PulsarMockBookKeeper;
 import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
 import org.apache.bookkeeper.client.api.LedgerEntries;
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
index 4bf518f..08e88ee 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
@@ -31,6 +31,7 @@ import com.google.common.collect.Lists;
 
 import io.netty.buffer.ByteBuf;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -245,6 +246,9 @@ public class OffloadPrefixReadTest extends MockedBookKeeperTestCase {
         private final DigestType digestType;
         private final long ctime;
         private final boolean isClosed;
+        private final int metadataFormatVersion;
+        private final State state;
+        private final byte[] password;
         private final Map<String, byte[]> customMetadata;
 
         MockMetadata(LedgerMetadata toCopy) {
@@ -256,11 +260,22 @@ public class OffloadPrefixReadTest extends MockedBookKeeperTestCase {
             digestType = toCopy.getDigestType();
             ctime = toCopy.getCtime();
             isClosed = toCopy.isClosed();
-
+            metadataFormatVersion = toCopy.getMetadataFormatVersion();
+            state = toCopy.getState();
+            password = Arrays.copyOf(toCopy.getPassword(), toCopy.getPassword().length);
             customMetadata = ImmutableMap.copyOf(toCopy.getCustomMetadata());
         }
 
         @Override
+        public boolean hasPassword() { return true; }
+
+        @Override
+        public State getState() { return state; }
+
+        @Override
+        public int getMetadataFormatVersion() { return metadataFormatVersion; }
+
+        @Override
         public int getEnsembleSize() { return ensembleSize; }
 
         @Override
@@ -279,6 +294,9 @@ public class OffloadPrefixReadTest extends MockedBookKeeperTestCase {
         public DigestType getDigestType() { return digestType; }
 
         @Override
+        public byte[] getPassword() { return password; }
+
+        @Override
         public long getCtime() { return ctime; }
 
         @Override
@@ -296,5 +314,10 @@ public class OffloadPrefixReadTest extends MockedBookKeeperTestCase {
         public NavigableMap<Long, ? extends List<BookieSocketAddress>> getAllEnsembles() {
             throw new UnsupportedOperationException("Pulsar shouldn't look at this");
         }
+
+        @Override
+        public String toSafeString() {
+            return toString();
+        }
     }
 }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index 7f90042..e27f34f 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -23,6 +23,9 @@
 
 package org.apache.bookkeeper.test;
 
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
 import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
@@ -38,7 +41,9 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
 import org.apache.bookkeeper.client.BookKeeperTestClient;
+import org.apache.bookkeeper.common.allocator.PoolingPolicy;
 import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -202,12 +207,15 @@ public abstract class BookKeeperClusterTestCase {
         conf.setAllowLoopback(true);
         conf.setFlushInterval(60 * 1000);
         conf.setGcWaitTime(60 * 1000);
+        conf.setAllocatorPoolingPolicy(PoolingPolicy.UnpooledHeap);
         String[] ledgerDirNames = new String[ledgerDirs.length];
         for (int i = 0; i < ledgerDirs.length; i++) {
             ledgerDirNames[i] = ledgerDirs[i].getPath();
         }
         conf.setLedgerDirNames(ledgerDirNames);
         conf.setLedgerStorageClass("org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage");
+        conf.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 4);
+        conf.setProperty(DbLedgerStorage.READ_AHEAD_CACHE_MAX_SIZE_MB, 4);
         return conf;
     }
 
@@ -434,7 +442,7 @@ public abstract class BookKeeperClusterTestCase {
     protected BookieServer startBookie(ServerConfiguration conf, final Bookie b) throws Exception {
         BookieServer server = new BookieServer(conf) {
             @Override
-            protected Bookie newBookie(ServerConfiguration conf)
+            protected Bookie newBookie(ServerConfiguration conf, ByteBufAllocator allocator)
                     throws IOException, KeeperException, InterruptedException, BookieException {
                 return b;
             }
diff --git a/pom.xml b/pom.xml
index ab10186..651812c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -142,9 +142,9 @@ flexible messaging model and an intuitive client API.</description>
     <!-- apache commons -->
     <commons-compress.version>1.15</commons-compress.version>
 
-    <bookkeeper.version>4.7.3</bookkeeper.version>
+    <bookkeeper.version>4.9.0</bookkeeper.version>
     <zookeeper.version>3.4.13</zookeeper.version>
-    <netty.version>4.1.22.Final</netty.version>
+    <netty.version>4.1.32.Final</netty.version>
     <storm.version>1.0.5</storm.version>
     <jetty.version>9.4.12.v20180830</jetty.version>
     <jersey.version>2.27</jersey.version>
@@ -165,8 +165,8 @@ flexible messaging model and an intuitive client API.</description>
     <protobuf2.version>2.4.1</protobuf2.version>
     <protobuf3.version>3.5.1</protobuf3.version>
     <protoc3.version>3.5.1-1</protoc3.version>
-    <grpc.version>1.16.1</grpc.version>
-    <protoc-gen-grpc-java.version>1.0.0</protoc-gen-grpc-java.version>
+    <grpc.version>1.18.0</grpc.version>
+    <protoc-gen-grpc-java.version>1.12.0</protoc-gen-grpc-java.version>
     <gson.version>2.8.2</gson.version>
     <sketches.version>0.8.3</sketches.version>
     <hbc-core.version>2.2.0</hbc-core.version>
@@ -184,7 +184,7 @@ flexible messaging model and an intuitive client API.</description>
     <scala.binary.version>2.11</scala.binary.version>
     <debezium.version>0.8.2</debezium.version>
     <jsonwebtoken.version>0.10.5</jsonwebtoken.version>
-    <opencensus.version>0.12.3</opencensus.version>
+    <opencensus.version>0.18.0</opencensus.version>
     <zstd.version>1.3.7-3</zstd.version>
     <hbase.version>1.4.9</hbase.version>
 
@@ -320,16 +320,30 @@ flexible messaging model and an intuitive client API.</description>
         <version>${reflections.version}</version>
       </dependency>
 
+      <!-- exclude the grpc version from bookkeeper and use the one defined here -->
       <dependency>
         <groupId>org.apache.bookkeeper</groupId>
         <artifactId>stream-storage-java-client</artifactId>
         <version>${bookkeeper.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-all</artifactId>
+          </exclusion>
+        </exclusions>
       </dependency>
 
+      <!-- exclude the grpc version from bookkeeper and use the one defined here -->
       <dependency>
         <groupId>org.apache.bookkeeper</groupId>
         <artifactId>stream-storage-server</artifactId>
         <version>${bookkeeper.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-all</artifactId>
+          </exclusion>
+        </exclusions>
       </dependency>
 
       <dependency>
@@ -404,10 +418,18 @@ flexible messaging model and an intuitive client API.</description>
         <version>${netty.version}</version>
       </dependency>
 
+      <!-- `netty-codec-http2` is a dependency of grpc, set the version to
+           be aligned with the netty version defined here -->
+      <dependency>
+        <groupId>io.netty</groupId>
+        <artifactId>netty-codec-http2</artifactId>
+        <version>${netty.version}</version>
+      </dependency>
+
       <dependency>
         <groupId>io.netty</groupId>
         <artifactId>netty-tcnative-boringssl-static</artifactId>
-        <version>2.0.7.Final</version>
+        <version>2.0.20.Final</version>
       </dependency>
 
       <dependency>
@@ -1017,11 +1039,31 @@ flexible messaging model and an intuitive client API.</description>
         </exclusion>
         <exclusion>
           <artifactId>log4j</artifactId>
-            <groupId>log4j</groupId>
+          <groupId>log4j</groupId>
         </exclusion>
-          <exclusion>
-            <groupId>org.jboss.netty</groupId>
-            <artifactId>netty</artifactId>
+        <exclusion>
+          <groupId>org.jboss.netty</groupId>
+          <artifactId>netty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-common</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-buffer</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-handler</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-transport-native-epoll</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-tcnative-boringssl-static</artifactId>
         </exclusion>
       </exclusions>
     </dependency>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
index 56ae8d5..d9acecc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
@@ -44,7 +44,7 @@ import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.replication.AutoRecoveryMain;
 import org.apache.bookkeeper.stats.StatsProvider;
-import org.apache.bookkeeper.util.ReflectionUtils;
+import org.apache.bookkeeper.common.util.ReflectionUtils;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
index e0a8d09..2ed20ac 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.bookkeeper.common.allocator.PoolingPolicy;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
@@ -55,6 +56,8 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
         bkConf.setNumChannelsPerBookie(16);
         bkConf.setUseV2WireProtocol(conf.isBookkeeperUseV2WireProtocol());
         bkConf.setEnableDigestTypeAutodetection(true);
+
+        bkConf.setAllocatorPoolingPolicy(PoolingPolicy.UnpooledHeap);
         if (conf.isBookkeeperClientHealthCheckEnabled()) {
             bkConf.enableBookieHealthCheck();
             bkConf.setBookieHealthCheckInterval(conf.getBookkeeperHealthCheckIntervalSec(), TimeUnit.SECONDS);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index 612b336..d7e9bb1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -324,7 +324,6 @@ public class TwoPhaseCompactor extends Compactor {
                                  bkf.complete(null);
                              }
                          }, null);
-        serialized.release();
         return bkf;
     }
 
@@ -359,4 +358,4 @@ public class TwoPhaseCompactor extends Compactor {
             this.latestForKey = latestForKey;
         }
     }
-}
\ No newline at end of file
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/SameThreadOrderedSafeExecutor.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/SameThreadOrderedSafeExecutor.java
index 2909857..3f74804 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/SameThreadOrderedSafeExecutor.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/SameThreadOrderedSafeExecutor.java
@@ -27,7 +27,16 @@ import org.apache.bookkeeper.stats.NullStatsLogger;
 public class SameThreadOrderedSafeExecutor extends OrderedExecutor {
 
     public SameThreadOrderedSafeExecutor() {
-        super("same-thread-executor", 1, new DefaultThreadFactory("test"), NullStatsLogger.INSTANCE, false, 100000, 10);
+        super(
+            "same-thread-executor",
+            1,
+            new DefaultThreadFactory("test"),
+            NullStatsLogger.INSTANCE,
+            false,
+            false,
+            100000,
+            10,
+            false);
     }
 
     @Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index 547c044..444d3e4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -126,7 +126,6 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest {
                                          f.complete(null);
                                      }
                                 }, null);
-                        buffer.release();
                         return f;
                     }).toArray(CompletableFuture[]::new)).get();
         lh.close();
diff --git a/pulsar-functions/instance/pom.xml b/pulsar-functions/instance/pom.xml
index f734b03..527a28a 100644
--- a/pulsar-functions/instance/pom.xml
+++ b/pulsar-functions/instance/pom.xml
@@ -80,6 +80,18 @@
     <dependency>
       <groupId>org.apache.bookkeeper</groupId>
       <artifactId>stream-storage-java-client</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>io.grpc</groupId>
+          <artifactId>grpc-all</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <!-- `grpc-all` is excluded from `stream-storage-server` at root pom file -->
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-all</artifactId>
     </dependency>
 
     <dependency>
diff --git a/pulsar-functions/runtime-all/pom.xml b/pulsar-functions/runtime-all/pom.xml
index 98bb46d..8d27708 100644
--- a/pulsar-functions/runtime-all/pom.xml
+++ b/pulsar-functions/runtime-all/pom.xml
@@ -59,6 +59,16 @@
       <artifactId>pulsar-client-original</artifactId>
       <version>${project.parent.version}</version>
     </dependency>
+
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-all</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-all</artifactId>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/pulsar-functions/runtime/pom.xml b/pulsar-functions/runtime/pom.xml
index f58999c..9aca775 100644
--- a/pulsar-functions/runtime/pom.xml
+++ b/pulsar-functions/runtime/pom.xml
@@ -46,11 +46,6 @@
     </dependency>
 
     <dependency>
-      <groupId>io.grpc</groupId>
-      <artifactId>grpc-all</artifactId>
-    </dependency>
-
-    <dependency>
       <groupId>com.beust</groupId>
       <artifactId>jcommander</artifactId>
     </dependency>
diff --git a/pulsar-zookeeper-utils/pom.xml b/pulsar-zookeeper-utils/pom.xml
index dcd06dc..35b39c7 100644
--- a/pulsar-zookeeper-utils/pom.xml
+++ b/pulsar-zookeeper-utils/pom.xml
@@ -47,6 +47,18 @@
     <dependency>
       <groupId>org.apache.bookkeeper</groupId>
       <artifactId>stream-storage-server</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>io.grpc</groupId>
+          <artifactId>grpc-all</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <!-- `grpc-all` is excluded from `stream-storage-server` at root pom file -->
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-all</artifactId>
     </dependency>
 
     <dependency>
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
index 877378f..7d9b53e 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
@@ -109,7 +109,7 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePl
     }
 
     @Override
-    public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
+    public PlacementResult<List<BookieSocketAddress>> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
             Map<String, byte[]> customMetadata, Set<BookieSocketAddress> excludeBookies)
             throws BKNotEnoughBookiesException {
         Set<BookieSocketAddress> blacklistedBookies = getBlacklistedBookies();
@@ -121,8 +121,8 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePl
     }
 
     @Override
-    public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
-            Map<String, byte[]> customMetadata, Set<BookieSocketAddress> currentEnsemble,
+    public PlacementResult<BookieSocketAddress> replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
+            Map<String, byte[]> customMetadata, List<BookieSocketAddress> currentEnsemble,
             BookieSocketAddress bookieToReplace, Set<BookieSocketAddress> excludeBookies)
             throws BKNotEnoughBookiesException {
         Set<BookieSocketAddress> blacklistedBookies = getBlacklistedBookies();
diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java
index eef8e34..3a807e8 100644
--- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java
+++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java
@@ -119,12 +119,12 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
         isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, SettableFeatureProvider.DISABLE_ALL, NullStatsLogger.INSTANCE);
         isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);
 
-        ArrayList<BookieSocketAddress> ensemble = isolationPolicy.newEnsemble(3, 3, 2, Collections.emptyMap(), new HashSet<>());
+        List<BookieSocketAddress> ensemble = isolationPolicy.newEnsemble(3, 3, 2, Collections.emptyMap(), new HashSet<>()).getResult();
         assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE1)));
         assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE2)));
         assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE4)));
 
-        ensemble = isolationPolicy.newEnsemble(1, 1, 1, Collections.emptyMap(), new HashSet<>());
+        ensemble = isolationPolicy.newEnsemble(1, 1, 1, Collections.emptyMap(), new HashSet<>()).getResult();
         assertFalse(ensemble.contains(new BookieSocketAddress(BOOKIE3)));
 
         try {
@@ -136,7 +136,7 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
 
         Set<BookieSocketAddress> bookieToExclude = new HashSet<>();
         bookieToExclude.add(new BookieSocketAddress(BOOKIE1));
-        ensemble = isolationPolicy.newEnsemble(2, 2, 2, Collections.emptyMap(), bookieToExclude);
+        ensemble = isolationPolicy.newEnsemble(2, 2, 2, Collections.emptyMap(), bookieToExclude).getResult();
         assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE4)));
         assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE2)));
 
@@ -148,7 +148,7 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
 
         Thread.sleep(100);
 
-        ensemble = isolationPolicy.newEnsemble(2, 2, 2, Collections.emptyMap(), null);
+        ensemble = isolationPolicy.newEnsemble(2, 2, 2, Collections.emptyMap(), null).getResult();
 
         assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE1)));
         assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE2)));
@@ -161,18 +161,18 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
         }
 
         try {
-            isolationPolicy.replaceBookie(3, 3, 3, Collections.emptyMap(), new HashSet<>(ensemble),
+            isolationPolicy.replaceBookie(3, 3, 3, Collections.emptyMap(), ensemble,
                     new BookieSocketAddress(BOOKIE5), new HashSet<>());
             fail("should not pass");
         } catch (BKNotEnoughBookiesException e) {
             // ok
         }
-
         bookieToExclude = new HashSet<>();
         bookieToExclude.add(new BookieSocketAddress(BOOKIE1));
-        ensemble = isolationPolicy.newEnsemble(1, 1, 1, Collections.emptyMap(), bookieToExclude);
+
+        ensemble = isolationPolicy.newEnsemble(1, 1, 1, Collections.emptyMap(), bookieToExclude).getResult();
         BookieSocketAddress chosenBookie = isolationPolicy.replaceBookie(1, 1, 1, Collections.emptyMap(),
-                new HashSet<>(ensemble), new BookieSocketAddress(BOOKIE5), new HashSet<>());
+                ensemble, ensemble.get(0), new HashSet<>()).getResult();
         assertTrue(chosenBookie.equals(new BookieSocketAddress(BOOKIE1)));
 
         localZkc.delete(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, -1);
@@ -201,7 +201,7 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
 
         Thread.sleep(100);
 
-        ArrayList<BookieSocketAddress> ensemble = isolationPolicy.newEnsemble(2, 2, 2, Collections.emptyMap(), new HashSet<>());
+        List<BookieSocketAddress> ensemble = isolationPolicy.newEnsemble(2, 2, 2, Collections.emptyMap(), new HashSet<>()).getResult();
         assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE1)));
         assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE2)));
 
@@ -242,7 +242,7 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
         isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, SettableFeatureProvider.DISABLE_ALL, NullStatsLogger.INSTANCE);
         isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);
 
-        ArrayList<BookieSocketAddress> ensemble = isolationPolicy.newEnsemble(2, 2, 2, Collections.emptyMap(), new HashSet<>());
+        List<BookieSocketAddress> ensemble = isolationPolicy.newEnsemble(2, 2, 2, Collections.emptyMap(), new HashSet<>()).getResult();
         assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE1)));
         assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE2)));
 
@@ -264,7 +264,7 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
         // wait for the zk to notify and update the mappings
         Thread.sleep(100);
 
-        ensemble = isolationPolicy.newEnsemble(3, 3, 3, Collections.emptyMap(), new HashSet<>());
+        ensemble = isolationPolicy.newEnsemble(3, 3, 3, Collections.emptyMap(), new HashSet<>()).getResult();
         assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE1)));
         assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE2)));
         assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE3)));
diff --git a/src/check-binary-license b/src/check-binary-license
index de817da..2ab5f8d 100755
--- a/src/check-binary-license
+++ b/src/check-binary-license
@@ -76,6 +76,9 @@ done
 
 # Check all jars mentioned in NOTICE are bundled
 for J in $NOTICEJARS; do
+    if [ $J == "checker-qual.jar" ]; then
+        continue
+    fi
     echo "$JARS" | grep -q $J
     if [ $? != 0 ]; then
         echo $J mentioned in NOTICE, but not bundled
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java
index 5e8fa0b..7c837fd 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java
@@ -45,7 +45,6 @@ import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.DataFormats;
 import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat;
-import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.State;
 import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
 import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexEntry;
 import org.slf4j.Logger;
@@ -214,6 +213,7 @@ public class OffloadIndexBlockImpl implements OffloadIndexBlock {
         private long length;
         private DataFormats.LedgerMetadataFormat.DigestType digestType;
         private long ctime;
+        private byte[] password;
         private State state;
         private Map<String, byte[]> customMetadata = Maps.newHashMap();
         private TreeMap<Long, ArrayList<BookieSocketAddress>> ensembles = new TreeMap<Long, ArrayList<BookieSocketAddress>>();
@@ -226,7 +226,8 @@ public class OffloadIndexBlockImpl implements OffloadIndexBlock {
             this.length = ledgerMetadataFormat.getLength();
             this.digestType = ledgerMetadataFormat.getDigestType();
             this.ctime = ledgerMetadataFormat.getCtime();
-            this.state = ledgerMetadataFormat.getState();
+            this.state = State.CLOSED;
+            this.password = ledgerMetadataFormat.getPassword().toByteArray();
 
             if (ledgerMetadataFormat.getCustomMetadataCount() > 0) {
                 ledgerMetadataFormat.getCustomMetadataList().forEach(
@@ -247,6 +248,18 @@ public class OffloadIndexBlockImpl implements OffloadIndexBlock {
         }
 
         @Override
+        public boolean hasPassword() { return true; }
+
+        @Override
+        public byte[] getPassword() { return password; }
+
+        @Override
+        public State getState() { return state; }
+
+        @Override
+        public int getMetadataFormatVersion() { return 2; }
+
+        @Override
         public int getEnsembleSize() {
             return this.ensembleSize;
         }
@@ -311,6 +324,11 @@ public class OffloadIndexBlockImpl implements OffloadIndexBlock {
         public NavigableMap<Long, ? extends List<BookieSocketAddress>> getAllEnsembles() {
             return this.ensembles;
         }
+
+        @Override
+        public String toSafeString() {
+            return toString();
+        }
     }
 
     private static LedgerMetadata parseLedgerMetadata(byte[] bytes) throws IOException {
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
index 8fe170d..47cd590 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
@@ -42,7 +42,6 @@ import java.util.concurrent.ExecutionException;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.client.LedgerMetadata;
 import org.apache.bookkeeper.client.PulsarMockBookKeeper;
 import org.apache.bookkeeper.client.api.DigestType;
 import org.apache.bookkeeper.client.api.LedgerEntries;
@@ -121,11 +120,6 @@ class BlobStoreManagedLedgerOffloaderTest extends BlobStoreTestBase {
             i++;
         }
 
-        // workaround mock not closing metadata correctly
-        Method close = LedgerMetadata.class.getDeclaredMethod("close", long.class);
-        close.setAccessible(true);
-        close.invoke(lh.getLedgerMetadata(), lh.getLastAddConfirmed());
-
         lh.close();
 
         return bk.newOpenLedgerOp().withLedgerId(lh.getId())
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexTest.java
index 447efc9..e344c43 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexTest.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexTest.java
@@ -33,7 +33,8 @@ import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerMetadataBuilder;
+import org.apache.bookkeeper.client.api.DigestType;
 import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
 import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder;
@@ -62,24 +63,6 @@ public class OffloadIndexTest {
         assertEquals(entry2.getDataOffset(), 1254L);
     }
 
-
-    // use mock to setLastEntryId
-    public static class LedgerMetadataMock extends org.apache.bookkeeper.client.LedgerMetadata {
-        long lastId = 0;
-        public LedgerMetadataMock(int ensembleSize, int writeQuorumSize, int ackQuorumSize, org.apache.bookkeeper.client.BookKeeper.DigestType digestType, byte[] password, Map<String, byte[]> customMetadata, boolean storeSystemtimeAsLedgerCreationTime) {
-            super(ensembleSize, writeQuorumSize, ackQuorumSize, digestType, password, customMetadata, storeSystemtimeAsLedgerCreationTime);
-        }
-
-        @Override
-        public long getLastEntryId(){
-            return  lastId;
-        }
-
-        public void setLastEntryId(long lastId) {
-            this.lastId = lastId;
-        }
-    }
-
     private LedgerMetadata createLedgerMetadata() throws Exception {
 
         Map<String, byte[]> metadataCustom = Maps.newHashMap();
@@ -94,12 +77,10 @@ public class OffloadIndexTest {
         bookies.add(1, BOOKIE2);
         bookies.add(2, BOOKIE3);
 
-        LedgerMetadataMock metadata = new LedgerMetadataMock(3, 3, 2,
-            DigestType.CRC32C, "password".getBytes(UTF_8), metadataCustom, false);
-
-        metadata.addEnsemble(0, bookies);
-        metadata.setLastEntryId(5000);
-        return metadata;
+        return LedgerMetadataBuilder.create().withEnsembleSize(3).withWriteQuorumSize(3).withAckQuorumSize(2)
+            .withDigestType(DigestType.CRC32C).withPassword("password".getBytes(UTF_8))
+            .withCustomMetadata(metadataCustom).withClosedState().withLastEntryId(5000).withLength(100)
+            .newEnsembleEntry(0L, bookies).build();
     }
 
     // prepare metadata, then use builder to build a OffloadIndexBlockImpl


Mime
View raw message