pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mme...@apache.org
Subject [pulsar] branch master updated: Add tiered storage support for Pulsar SQL (#4045)
Date Tue, 16 Apr 2019 00:01:14 GMT
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 76aacd3  Add tiered storage support for Pulsar SQL (#4045)
76aacd3 is described below

commit 76aacd3c3477342d9cb6709098cd4a2724b78f77
Author: Boyang Jerry Peng <jerry.boyang.peng@gmail.com>
AuthorDate: Mon Apr 15 17:01:09 2019 -0700

    Add tiered storage support for Pulsar SQL (#4045)
    
    * Adding offloader support for sql
    
    * cleaning up
    
    * cleaning up imports
    
    * cleaning up configs
    
    * fix imports
    
    * fix behavior when offloader not configured and fix license
    
    * fix unit test
---
 conf/presto/catalog/pulsar.properties              |  18 ++++
 .../apache/bookkeeper/mledger/LedgerOffloader.java |   4 +
 .../mledger/impl/ReadOnlyCursorImpl.java           |   5 +
 .../bookkeeper/mledger/offload/OffloaderUtils.java |   7 +-
 .../org/apache/pulsar/broker/PulsarService.java    |   9 +-
 .../apache/pulsar/common/nar/NarClassLoader.java   |  16 +++-
 pulsar-sql/presto-distribution/LICENSE             |  73 ++++++++++++++-
 pulsar-sql/presto-pulsar/pom.xml                   | 104 ++++++++++++++++-----
 .../pulsar/sql/presto/AvroSchemaHandler.java       |   8 +-
 .../pulsar/sql/presto/JSONSchemaHandler.java       |   4 +-
 .../pulsar/sql/presto/PulsarConnectorCache.java    |  93 ++++++++++++++++--
 .../pulsar/sql/presto/PulsarConnectorConfig.java   |  52 ++++++++++-
 .../sql/presto/PulsarConnectorMetricsTracker.java  |  10 +-
 .../pulsar/sql/presto/PulsarConnectorUtils.java    |  10 ++
 .../pulsar/sql/presto/PulsarRecordCursor.java      |  46 ++++++---
 .../pulsar/sql/presto/PulsarSplitManager.java      |  14 ++-
 .../apache/pulsar/sql/presto/SchemaHandler.java    |   2 +-
 .../pulsar/sql/presto/TestPulsarConnector.java     |  36 ++++---
 .../pulsar/sql/presto/TestPulsarMetadata.java      |   4 +-
 19 files changed, 418 insertions(+), 97 deletions(-)

diff --git a/conf/presto/catalog/pulsar.properties b/conf/presto/catalog/pulsar.properties
index 77b22dc..7f191e5 100644
--- a/conf/presto/catalog/pulsar.properties
+++ b/conf/presto/catalog/pulsar.properties
@@ -31,3 +31,21 @@ pulsar.target-num-splits=2
 pulsar.max-split-message-queue-size=10000
 # max entry queue size
 pulsar.max-split-entry-queue-size = 1000
+
+
+####### TIERED STORAGE OFFLOADER CONFIGS #######
+
+## Driver to use to offload old data to long term storage
+#pulsar.managed-ledger-offload-driver = aws-s3
+
+## The directory to locate offloaders
+#pulsar.offloaders-directory = /pulsar/offloaders
+
+## Maximum number of thread pool threads for ledger offloading
+#pulsar.managed-ledger-offload-max-threads = 2
+
+## Properties and configurations related to specific offloader implementation
+#pulsar.offloader-properties = \
+#  {"s3ManagedLedgerOffloadBucket": "offload-bucket", \
+#  "s3ManagedLedgerOffloadRegion": "us-west-2", \
+#  "s3ManagedLedgerOffloadServiceEndpoint": "http://s3.amazonaws.com"}
\ No newline at end of file
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
index 8fc35cc..c85fe9f 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
@@ -33,6 +33,10 @@ import org.apache.bookkeeper.client.api.ReadHandle;
 @Beta
 public interface LedgerOffloader {
 
+    // TODO: improve the user metadata in subsequent changes
+    String METADATA_SOFTWARE_VERSION_KEY = "S3ManagedLedgerOffloaderSoftwareVersion";
+    String METADATA_SOFTWARE_GITSHA_KEY = "S3ManagedLedgerOffloaderSoftwareGitSha";
+
     /**
      * Get offload driver name.
      *
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
index c1a2216..8c9da1d 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
@@ -27,6 +27,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ReadOnlyCursor;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 
 @Slf4j
 public class ReadOnlyCursorImpl extends ManagedCursorImpl implements ReadOnlyCursor {
@@ -62,6 +63,10 @@ public class ReadOnlyCursorImpl extends ManagedCursorImpl implements ReadOnlyCur
         callback.closeComplete(ctx);
     }
 
+    public MLDataFormats.ManagedLedgerInfo.LedgerInfo getCurrentLedgerInfo() {
+        return this.ledger.getLedgersInfo().get(this.readPosition.getLedgerId());
+    }
+
     public long getNumberOfEntries(Range<PositionImpl> range) {
         return this.ledger.getNumberOfEntries(range);
     }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java
index 8726704..845b53f 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java
@@ -49,7 +49,10 @@ public class OffloaderUtils {
      * @throws IOException when fail to retrieve the pulsar offloader class
      */
     static Pair<NarClassLoader, LedgerOffloaderFactory> getOffloaderFactory(String narPath) throws IOException {
-        NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet());
+        // need to load offloader NAR to the classloader that also loaded LedgerOffloaderFactory in case
+        // LedgerOffloaderFactory is loaded by a classloader that is not the default classloader
+        // as is the case for the pulsar presto plugin
+        NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet(), LedgerOffloaderFactory.class.getClassLoader());
         String configStr = ncl.getServiceDefinition(PULSAR_OFFLOADER_SERVICE_NAME);
 
         OffloaderDefinition conf = ObjectMapperFactory.getThreadLocalYaml()
@@ -66,8 +69,6 @@ public class OffloaderUtils {
             CompletableFuture<LedgerOffloaderFactory> loadFuture = new CompletableFuture<>();
             Thread loadingThread = new Thread(() -> {
                 Thread.currentThread().setContextClassLoader(ncl);
-
-                log.info("Loading offloader factory {} using class loader {}", factoryClass, ncl);
                 try {
                     Object offloader = factoryClass.newInstance();
                     if (!(offloader instanceof LedgerOffloaderFactory)) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 3cf442a..52c43c4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -692,11 +692,6 @@ public class PulsarService implements AutoCloseable {
         return offloader;
     }
 
-    // TODO: improve the user metadata in subsequent changes
-    static final String METADATA_SOFTWARE_VERSION_KEY = "S3ManagedLedgerOffloaderSoftwareVersion";
-    static final String METADATA_SOFTWARE_GITSHA_KEY = "S3ManagedLedgerOffloaderSoftwareGitSha";
-
-
     public synchronized LedgerOffloader createManagedLedgerOffloader(ServiceConfiguration conf)
             throws PulsarServerException {
         try {
@@ -711,8 +706,8 @@ public class PulsarService implements AutoCloseable {
                     return offloaderFactory.create(
                         conf.getProperties(),
                         ImmutableMap.of(
-                            METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(),
-                            METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha()
+                            LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(),
+                            LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha()
                         ),
                         getOffloaderScheduler(conf));
                 } catch (IOException ioe) {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
index 1ba7ae7..0b78344 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
@@ -144,7 +144,16 @@ public class NarClassLoader extends URLClassLoader {
     public static NarClassLoader getFromArchive(File narPath, Set<String> additionalJars) throws IOException {
         File unpacked = NarUnpacker.unpackNar(narPath, NAR_CACHE_DIR);
         try {
-            return new NarClassLoader(unpacked, additionalJars);
+            return new NarClassLoader(unpacked, additionalJars, NarClassLoader.class.getClassLoader() );
+        } catch (ClassNotFoundException e) {
+            throw new IOException(e);
+        }
+    }
+
+    public static NarClassLoader getFromArchive(File narPath, Set<String> additionalJars, ClassLoader parent) throws IOException {
+        File unpacked = NarUnpacker.unpackNar(narPath, NAR_CACHE_DIR);
+        try {
+            return new NarClassLoader(unpacked, additionalJars, parent);
         } catch (ClassNotFoundException e) {
             throw new IOException(e);
         }
@@ -155,6 +164,7 @@ public class NarClassLoader extends URLClassLoader {
      *
      * @param narWorkingDirectory
      *            directory to explode nar contents to
+     * @param parent
      * @throws IllegalArgumentException
      *             if the NAR is missing the Java Services API file for <tt>FlowFileProcessor</tt> implementations.
      * @throws ClassNotFoundException
@@ -163,9 +173,9 @@ public class NarClassLoader extends URLClassLoader {
      * @throws IOException
      *             if an error occurs while loading the NAR.
      */
-    private NarClassLoader(final File narWorkingDirectory, Set<String> additionalJars)
+    private NarClassLoader(final File narWorkingDirectory, Set<String> additionalJars, ClassLoader parent)
             throws ClassNotFoundException, IOException {
-        super(new URL[0]);
+        super(new URL[0], parent);
         this.narWorkingDirectory = narWorkingDirectory;
 
         // process the classpath
diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE
index b2a9830..d61b2fa 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -235,6 +235,23 @@ The Apache Software License, Version 2.0
     - commons-lang3-3.4.jar
  * Netty
     - netty-3.6.2.Final.jar
+    - netty-all-4.1.32.Final.jar
+    - netty-buffer-4.1.31.Final.jar
+    - netty-codec-4.1.31.Final.jar
+    - netty-codec-dns-4.1.33.Final.jar
+    - netty-codec-http-4.1.33.Final.jar
+    - netty-codec-socks-4.1.33.Final.jar
+    - netty-common-4.1.31.Final.jar
+    - netty-handler-4.1.31.Final.jar
+    - netty-handler-proxy-4.1.33.Final.jar
+    - netty-reactive-streams-2.0.0.jar
+    - netty-resolver-4.1.31.Final.jar
+    - netty-resolver-dns-4.1.33.Final.jar
+    - netty-tcnative-boringssl-static-2.0.20.Final.jar
+    - netty-transport-4.1.31.Final.jar
+    - netty-transport-native-epoll-4.1.31.Final.jar
+    - netty-transport-native-epoll-4.1.33.Final-linux-x86_64.jar
+    - netty-transport-native-unix-common-4.1.31.Final.jar
  * Joda Time
     - joda-time-2.9.9.jar
  * Jetty
@@ -252,8 +269,6 @@ The Apache Software License, Version 2.0
     - jetty-server-9.4.11.v20180605.jar
     - jetty-servlet-9.4.11.v20180605.jar
     - jetty-util-9.4.11.v20180605.jar
-  * Javassist
-    - javassist-3.22.0-CR2.jar
   * Asynchronous Http Client
     - async-http-client-1.6.5.jar
   * Apache BVal
@@ -372,7 +387,6 @@ The Apache Software License, Version 2.0
     - rocksdbjni-5.13.3.jar
   * SnakeYAML
     - snakeyaml-1.17.jar
-    - snakeyaml-1.23.jar
   * Snappy Java
     - snappy-java-1.1.1.3.jar
   * Bean Validation API
@@ -392,10 +406,59 @@ The Apache Software License, Version 2.0
     - lz4-java-1.5.0.jar
   * JCTools
     - jctools-core-2.1.2.jar
+  * Asynchronous Http Client
+    - async-http-client-2.7.0.jar
+    - async-http-client-netty-utils-2.7.0.jar
+  * Apache Bookkeeper
+    - bookkeeper-common-4.9.0.jar
+    - bookkeeper-common-allocator-4.9.0.jar
+    - bookkeeper-proto-4.9.0.jar
+    - bookkeeper-server-4.9.0.jar
+    - bookkeeper-stats-api-4.9.0.jar
+    - bookkeeper-tools-framework-4.9.0.jar
+    - circe-checksum-4.9.0.jar
+    - codahale-metrics-provider-4.9.0.jar
+    - cpu-affinity-4.9.0.jar
+    - http-server-4.9.0.jar
+    - prometheus-metrics-provider-4.9.0.jar
+  * Apache Commons
+    - commons-cli-1.2.jar
+    - commons-codec-1.10.jar
+    - commons-collections4-4.1.jar
+    - commons-configuration-1.10.jar
+    - commons-io-2.5.jar
+    - commons-lang-2.6.jar
+    - commons-logging-1.1.1.jar
+  * GSON
+    - gson-2.8.2.jar
+  * Jackson
+    - jackson-jaxrs-base-2.8.11.jar
+    - jackson-jaxrs-json-provider-2.8.11.jar
+    - jackson-module-jaxb-annotations-2.8.11.jar
+    - jackson-module-jsonSchema-2.8.11.jar
+  * Java Assist
+    - javassist-3.21.0-GA.jar
+  * Jetty
+    - jetty-http-9.4.12.v20180830.jar
+    - jetty-io-9.4.12.v20180830.jar
+    - jetty-security-9.4.12.v20180830.jar
+    - jetty-server-9.4.12.v20180830.jar
+    - jetty-servlet-9.4.12.v20180830.jar
+    - jetty-util-9.4.12.v20180830.jar
+  * Java Native Access
+    - jna-4.2.0.jar
+  * Yahoo Datasketches
+    - memory-0.8.3.jar
+    - sketches-core-0.8.3.jar
+  * Apache Zookeeper
+    - zookeeper-3.4.13.jar
+  * Apache Yetus Audience Annotations
+    - audience-annotations-0.5.0.jar
 
 Protocol Buffers License
  * Protocol Buffers
    - protobuf-shaded-2.1.0-incubating.jar
+   - protobuf-java-3.5.1.jar
 
 BSD 3-clause "New" or "Revised" License
   *  RE2J TD -- re2j-td-1.4.jar
@@ -476,6 +539,8 @@ CDDL-1.1 -- licenses/LICENSE-CDDL-1.1.txt
      - jgrapht-core-0.9.0.jar
   * Logback Core Module
     - logback-core-1.2.3.jar
+  * MIME Streaming Extension
+    - mimepull-1.9.6.jar
 
 Public Domain (CC0) -- licenses/LICENSE-CC0.txt
  * HdrHistogram
@@ -484,6 +549,8 @@ Public Domain (CC0) -- licenses/LICENSE-CC0.txt
    - aopalliance-1.0.jar
  * XZ For Java
     - xz-1.5.jar
+ * Reactive Streams
+    - reactive-streams-1.0.2.jar
 
 Bouncy Castle License
  * Bouncy Castle -- licenses/LICENSE-bouncycastle.txt
diff --git a/pulsar-sql/presto-pulsar/pom.xml b/pulsar-sql/presto-pulsar/pom.xml
index 795d689..30bb88b 100644
--- a/pulsar-sql/presto-pulsar/pom.xml
+++ b/pulsar-sql/presto-pulsar/pom.xml
@@ -34,11 +34,6 @@
 
     <properties>
         <dep.airlift.version>0.170</dep.airlift.version>
-        <dep.slice.version>0.35</dep.slice.version>
-        <dep.guice.version>4.2.0</dep.guice.version>
-        <dep.javax-validation.version>1.1.0.Final</dep.javax-validation.version>
-        <dep.javax-inject.version>1</dep.javax-inject.version>
-        <dep.guava.version>24.1-jre</dep.guava.version>
         <jctools.version>2.1.2</jctools.version>
         <dslJson.verson>1.8.4</dslJson.verson>
     </properties>
@@ -57,24 +52,6 @@
         </dependency>
 
         <dependency>
-            <groupId>com.google.inject</groupId>
-            <artifactId>guice</artifactId>
-            <version>${dep.guice.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>javax.validation</groupId>
-            <artifactId>validation-api</artifactId>
-            <version>${dep.javax-validation.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>javax.inject</groupId>
-            <artifactId>javax.inject</artifactId>
-            <version>${dep.javax-inject.version}</version>
-        </dependency>
-
-        <dependency>
             <groupId>org.apache.avro</groupId>
             <artifactId>avro</artifactId>
             <version>${avro.version}</version>
@@ -82,13 +59,13 @@
 
         <dependency>
             <groupId>org.apache.pulsar</groupId>
-            <artifactId>pulsar-client-admin</artifactId>
+            <artifactId>pulsar-client-admin-original</artifactId>
             <version>${project.version}</version>
         </dependency>
 
         <dependency>
             <groupId>org.apache.pulsar</groupId>
-            <artifactId>managed-ledger</artifactId>
+            <artifactId>managed-ledger-original</artifactId>
             <version>${project.version}</version>
         </dependency>
 
@@ -127,4 +104,81 @@
 
     </dependencies>
 
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <createDependencyReducedPom>true</createDependencyReducedPom>
+                            <promoteTransitiveDependencies>true</promoteTransitiveDependencies>
+
+                            <artifactSet>
+                                <includes>
+                                    <include>org.apache.pulsar:pulsar-client-original</include>
+                                    <include>org.apache.pulsar:pulsar-client-admin-original</include>
+                                    <include>org.apache.pulsar:managed-ledger-original</include>
+
+                                    <include>org.glassfish.jersey*:*</include>
+                                    <include>javax.ws.rs:*</include>
+                                    <include>javax.annotation:*</include>
+                                    <include>org.glassfish.hk2*:*</include>
+
+                                    <include>org.apache.httpcomponents:*</include>
+                                    <include>org.eclipse.jetty:*</include>
+
+                                </includes>
+                            </artifactSet>
+                            <filters>
+                                <filter>
+                                    <artifact>org.apache.pulsar:pulsar-client-original</artifact>
+                                    <includes>
+                                        <include>**</include>
+                                    </includes>
+                                </filter>
+                            </filters>
+                            <relocations>
+                                <relocation>
+                                    <pattern>org.glassfish</pattern>
+                                    <shadedPattern>org.apache.pulsar.shade.org.glassfish</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>javax.ws</pattern>
+                                    <shadedPattern>org.apache.pulsar.shade.javax.ws</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>javax.annotation</pattern>
+                                    <shadedPattern>org.apache.pulsar.shade.javax.annotation</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>jersey</pattern>
+                                    <shadedPattern>org.apache.pulsar.shade.jersey</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.eclipse.jetty</pattern>
+                                    <shadedPattern>org.apache.pulsar.shade.org.eclipse.jetty</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.apache.http</pattern>
+                                    <shadedPattern>org.apache.pulsar.shade.org.apache.http</shadedPattern>
+                                </relocation>
+
+                            </relocations>
+                            <transformers>
+                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+                                <transformer implementation="org.apache.maven.plugins.shade.resource.PluginXmlResourceTransformer" />
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
 </project>
\ No newline at end of file
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
index 41c2f6f..5d55682 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
@@ -20,10 +20,10 @@ package org.apache.pulsar.sql.presto;
 
 import io.airlift.log.Logger;
 
-import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
-import org.apache.pulsar.shade.io.netty.buffer.ByteBufAllocator;
-import org.apache.pulsar.shade.io.netty.util.ReferenceCountUtil;
-import org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocal;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.util.ReferenceCountUtil;
+import io.netty.util.concurrent.FastThreadLocal;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java
index ae1a7c4..5a12d30 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java
@@ -28,8 +28,8 @@ import java.math.BigDecimal;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
-import org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocal;
+import io.netty.buffer.ByteBuf;
+import io.netty.util.concurrent.FastThreadLocal;
 
 public class JSONSchemaHandler implements SchemaHandler {
 
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
index ee775bb..a9cd070 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
@@ -18,19 +18,44 @@
  */
 package org.apache.pulsar.sql.presto;
 
-import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import com.google.common.collect.ImmutableMap;
+import io.airlift.log.Logger;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
-import org.apache.pulsar.shade.org.apache.bookkeeper.conf.ClientConfiguration;
-import org.apache.pulsar.shade.org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
+import org.apache.bookkeeper.mledger.offload.OffloaderUtils;
+import org.apache.bookkeeper.mledger.offload.Offloaders;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.PulsarVersion;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkNotNull;
 
 public class PulsarConnectorCache {
 
+    private static final Logger log = Logger.get(PulsarConnectorCache.class);
+
     private static PulsarConnectorCache instance;
 
     private final ManagedLedgerFactory managedLedgerFactory;
 
     private final StatsProvider statsProvider;
+    private OrderedScheduler offloaderScheduler;
+    private Offloaders offloaderManager;
+    private LedgerOffloader offloader;
+
+    private static final String OFFLOADERS_DIRECTOR = "offloadersDirectory";
+    private static final String MANAGED_LEDGER_OFFLOAD_DRIVER = "managedLedgerOffloadDriver";
+    private static final String MANAGED_LEDGER_OFFLOAD_MAX_THREADS = "managedLedgerOffloadMaxThreads";
+
 
     private PulsarConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws Exception {
         this.managedLedgerFactory = initManagedLedgerFactory(pulsarConnectorConfig);
@@ -43,6 +68,8 @@ public class PulsarConnectorCache {
         pulsarConnectorConfig.getStatsProviderConfigs().forEach((key, value) -> clientConfiguration.setProperty(key, value));
 
         this.statsProvider.start(clientConfiguration);
+
+        this.offloader = initManagedLedgerOffloader(pulsarConnectorConfig);
     }
 
     public static PulsarConnectorCache getConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws Exception {
@@ -57,8 +84,6 @@ public class PulsarConnectorCache {
     private static ManagedLedgerFactory initManagedLedgerFactory(PulsarConnectorConfig pulsarConnectorConfig) throws Exception {
         ClientConfiguration bkClientConfiguration = new ClientConfiguration()
                 .setZkServers(pulsarConnectorConfig.getZookeeperUri())
-                .setAllowShadedLedgerManagerFactoryClass(true)
-                .setShadedLedgerManagerFactoryClassPrefix("org.apache.pulsar.shade.")
                 .setClientTcpNoDelay(false)
                 .setUseV2WireProtocol(true)
                 .setStickyReadsEnabled(true)
@@ -66,6 +91,58 @@ public class PulsarConnectorCache {
         return new ManagedLedgerFactoryImpl(bkClientConfiguration);
     }
 
+    public ManagedLedgerConfig getManagedLedgerConfig() {
+
+        return new ManagedLedgerConfig()
+                .setLedgerOffloader(this.offloader);
+    }
+
+    private synchronized OrderedScheduler getOffloaderScheduler(PulsarConnectorConfig pulsarConnectorConfig) {
+        if (this.offloaderScheduler == null) {
+            this.offloaderScheduler = OrderedScheduler.newSchedulerBuilder()
+                    .numThreads(pulsarConnectorConfig.getManagedLedgerOffloadMaxThreads())
+                    .name("pulsar-offloader").build();
+        }
+        return this.offloaderScheduler;
+    }
+
+    private LedgerOffloader initManagedLedgerOffloader(PulsarConnectorConfig conf) {
+
+        try {
+            if (StringUtils.isNotBlank(conf.getManagedLedgerOffloadDriver())) {
+                checkNotNull(conf.getOffloadersDirectory(),
+                        "Offloader driver is configured to be '%s' but no offloaders directory is configured.",
+                        conf.getManagedLedgerOffloadDriver());
+                this.offloaderManager = OffloaderUtils.searchForOffloaders(conf.getOffloadersDirectory());
+                LedgerOffloaderFactory offloaderFactory = this.offloaderManager.getOffloaderFactory(
+                        conf.getManagedLedgerOffloadDriver());
+
+                Map<String, String> offloaderProperties = conf.getOffloaderProperties();
+                offloaderProperties.put(OFFLOADERS_DIRECTOR, conf.getOffloadersDirectory());
+                offloaderProperties.put(MANAGED_LEDGER_OFFLOAD_DRIVER, conf.getManagedLedgerOffloadDriver());
+                offloaderProperties.put(MANAGED_LEDGER_OFFLOAD_MAX_THREADS, String.valueOf(conf.getManagedLedgerOffloadMaxThreads()));
+
+                try {
+                    return offloaderFactory.create(
+                            PulsarConnectorUtils.getProperties(offloaderProperties),
+                            ImmutableMap.of(
+                                    LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(),
+                                    LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha()
+                            ),
+                            getOffloaderScheduler(conf));
+                } catch (IOException ioe) {
+                    log.error("Failed to create offloader: ", ioe);
+                    throw new RuntimeException(ioe.getMessage(), ioe.getCause());
+                }
+            } else {
+                log.info("No ledger offloader configured, using NULL instance");
+                return NullLedgerOffloader.INSTANCE;
+            }
+        } catch (Throwable t) {
+            throw new RuntimeException(t);
+        }
+    }
+
     public ManagedLedgerFactory getManagedLedgerFactory() {
         return managedLedgerFactory;
     }
@@ -74,11 +151,13 @@ public class PulsarConnectorCache {
         return statsProvider;
     }
 
-    public static void shutdown() throws ManagedLedgerException, InterruptedException {
+    public static void shutdown() throws Exception {
         synchronized (PulsarConnectorCache.class) {
             if (instance != null) {
-                instance.managedLedgerFactory.shutdown();
                 instance.statsProvider.stop();
+                instance.managedLedgerFactory.shutdown();
+                instance.offloaderScheduler.shutdown();
+                instance.offloaderManager.close();
                 instance = null;
             }
         }
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
index 34d332e..2399233 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
@@ -22,7 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import io.airlift.configuration.Config;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.shade.org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.bookkeeper.stats.NullStatsProvider;
 
 import javax.validation.constraints.NotNull;
 import java.io.IOException;
@@ -39,6 +39,13 @@ public class PulsarConnectorConfig implements AutoCloseable {
     private int maxSplitEntryQueueSize = 1000;
     private String statsProvider = NullStatsProvider.class.getName();
     private Map<String, String> statsProviderConfigs = new HashMap<>();
+
+    /**** --- Ledger Offloading --- ****/
+    private String managedLedgerOffloadDriver = null;
+    private int managedLedgerOffloadMaxThreads = 2;
+    private String offloadersDirectory = "./offloaders";
+    private Map<String, String> offloaderProperties = new HashMap<>();
+
     private PulsarAdmin pulsarAdmin;
 
     @NotNull
@@ -129,6 +136,49 @@ public class PulsarConnectorConfig implements AutoCloseable {
         return this;
     }
 
+    /**** --- Ledger Offloading --- ****/
+
+    public int getManagedLedgerOffloadMaxThreads() {
+        return this.managedLedgerOffloadMaxThreads;
+    }
+
+    @Config("pulsar.managed-ledger-offload-max-threads")
+    public PulsarConnectorConfig setManagedLedgerOffloadMaxThreads(int managedLedgerOffloadMaxThreads) throws IOException {
+        this.managedLedgerOffloadMaxThreads = managedLedgerOffloadMaxThreads;
+        return this;
+    }
+
+    public String getManagedLedgerOffloadDriver() {
+        return this.managedLedgerOffloadDriver;
+    }
+
+    @Config("pulsar.managed-ledger-offload-driver")
+    public PulsarConnectorConfig setManagedLedgerOffloadDriver(String managedLedgerOffloadDriver) throws IOException {
+        this.managedLedgerOffloadDriver = managedLedgerOffloadDriver;
+        return this;
+    }
+
+    public String getOffloadersDirectory() {
+        return this.offloadersDirectory;
+    }
+
+
+    @Config("pulsar.offloaders-directory")
+    public PulsarConnectorConfig setOffloadersDirectory(String offloadersDirectory) throws IOException {
+        this.offloadersDirectory = offloadersDirectory;
+        return this;
+    }
+
+    public Map<String, String> getOffloaderProperties() {
+        return this.offloaderProperties;
+    }
+
+    @Config("pulsar.offloader-properties")
+    public PulsarConnectorConfig setOffloaderProperties(String offloaderProperties) throws IOException {
+        this.offloaderProperties = new ObjectMapper().readValue(offloaderProperties, Map.class);
+        return this;
+    }
+
     @NotNull
     public PulsarAdmin getPulsarAdmin() throws PulsarClientException {
         if (this.pulsarAdmin == null) {
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorMetricsTracker.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorMetricsTracker.java
index d62a788..34969d8 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorMetricsTracker.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorMetricsTracker.java
@@ -18,11 +18,11 @@
  */
 package org.apache.pulsar.sql.presto;
 
-import org.apache.pulsar.shade.org.apache.bookkeeper.stats.Counter;
-import org.apache.pulsar.shade.org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.pulsar.shade.org.apache.bookkeeper.stats.StatsProvider;
-import org.apache.pulsar.shade.org.apache.bookkeeper.stats.NullStatsProvider;
-import org.apache.pulsar.shade.org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.bookkeeper.stats.StatsLogger;
 
 import java.util.concurrent.TimeUnit;
 
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorUtils.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorUtils.java
index 520ee68..ee256f6 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorUtils.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorUtils.java
@@ -26,6 +26,8 @@ import org.apache.pulsar.common.naming.TopicName;
 
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
+import java.util.Map;
+import java.util.Properties;
 
 public class PulsarConnectorUtils {
 
@@ -73,4 +75,12 @@ public class PulsarConnectorUtils {
             throw new RuntimeException("User class constructor throws exception", e);
         }
     }
+
+    public static Properties getProperties(Map<String, String> configMap) {
+        Properties properties = new Properties();
+        for (Map.Entry<String, String> entry : configMap.entrySet()) {
+            properties.setProperty(entry.getKey(), entry.getValue());
+        }
+        return properties;
+    }
 }
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index 9427f20..4ffbd2f 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -56,6 +56,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.ReadOnlyCursor;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.impl.ReadOnlyCursorImpl;
 import org.apache.pulsar.common.api.raw.MessageParser;
 import org.apache.pulsar.common.api.raw.RawMessage;
 import org.apache.pulsar.common.naming.NamespaceName;
@@ -83,6 +84,7 @@ public class PulsarRecordCursor implements RecordCursor {
     private DeserializeEntries deserializeEntries;
     private TopicName topicName;
     private PulsarConnectorMetricsTracker metricsTracker;
+    private boolean readOffloaded;
 
     // Stats total execution time of split
     private long startTime;
@@ -110,19 +112,20 @@ public class PulsarRecordCursor implements RecordCursor {
             throw new RuntimeException(e);
         }
         initialize(columnHandles, pulsarSplit, pulsarConnectorConfig,
-                pulsarConnectorCache.getManagedLedgerFactory(),
+                pulsarConnectorCache.getManagedLedgerFactory(), pulsarConnectorCache.getManagedLedgerConfig(),
                 new PulsarConnectorMetricsTracker(pulsarConnectorCache.getStatsProvider()));
     }
 
     // Exposed for testing purposes
     PulsarRecordCursor(List<PulsarColumnHandle> columnHandles, PulsarSplit pulsarSplit, PulsarConnectorConfig
-            pulsarConnectorConfig, ManagedLedgerFactory managedLedgerFactory, PulsarConnectorMetricsTracker pulsarConnectorMetricsTracker) {
+            pulsarConnectorConfig, ManagedLedgerFactory managedLedgerFactory, ManagedLedgerConfig managedLedgerConfig,
+                       PulsarConnectorMetricsTracker pulsarConnectorMetricsTracker) {
         this.splitSize = pulsarSplit.getSplitSize();
-        initialize(columnHandles, pulsarSplit, pulsarConnectorConfig, managedLedgerFactory, pulsarConnectorMetricsTracker);
+        initialize(columnHandles, pulsarSplit, pulsarConnectorConfig, managedLedgerFactory, managedLedgerConfig, pulsarConnectorMetricsTracker);
     }
 
     private void initialize(List<PulsarColumnHandle> columnHandles, PulsarSplit pulsarSplit, PulsarConnectorConfig
-            pulsarConnectorConfig, ManagedLedgerFactory managedLedgerFactory,
+            pulsarConnectorConfig, ManagedLedgerFactory managedLedgerFactory, ManagedLedgerConfig managedLedgerConfig,
                             PulsarConnectorMetricsTracker pulsarConnectorMetricsTracker) {
         this.columnHandles = columnHandles;
         this.pulsarSplit = pulsarSplit;
@@ -134,6 +137,7 @@ public class PulsarRecordCursor implements RecordCursor {
                 NamespaceName.get(pulsarSplit.getSchemaName()),
                 pulsarSplit.getTableName());
         this.metricsTracker = pulsarConnectorMetricsTracker;
+        this.readOffloaded = pulsarConnectorConfig.getManagedLedgerOffloadDriver() != null;
 
         Schema schema = PulsarConnectorUtils.parseSchema(pulsarSplit.getSchema());
 
@@ -143,7 +147,7 @@ public class PulsarRecordCursor implements RecordCursor {
 
         try {
             this.cursor = getCursor(TopicName.get("persistent", NamespaceName.get(pulsarSplit.getSchemaName()),
-                    pulsarSplit.getTableName()), pulsarSplit.getStartPosition(), managedLedgerFactory);
+                    pulsarSplit.getTableName()), pulsarSplit.getStartPosition(), managedLedgerFactory, managedLedgerConfig);
         } catch (ManagedLedgerException | InterruptedException e) {
             log.error(e, "Failed to get read only cursor");
             close();
@@ -168,11 +172,11 @@ public class PulsarRecordCursor implements RecordCursor {
     }
 
     private ReadOnlyCursor getCursor(TopicName topicName, Position startPosition, ManagedLedgerFactory
-            managedLedgerFactory)
+            managedLedgerFactory, ManagedLedgerConfig managedLedgerConfig)
             throws ManagedLedgerException, InterruptedException {
 
         ReadOnlyCursor cursor = managedLedgerFactory.openReadOnlyCursor(topicName.getPersistenceNamingEncoding(),
-                startPosition, new ManagedLedgerConfig());
+                startPosition, managedLedgerConfig);
 
         return cursor;
     }
@@ -298,7 +302,6 @@ public class PulsarRecordCursor implements RecordCursor {
         public void run() {
 
             if (outstandingReadsRequests.get() > 0) {
-
                 if (!cursor.hasMoreEntries() || ((PositionImpl) cursor.getReadPosition())
                         .compareTo(pulsarSplit.getEndPosition()) >= 0) {
                     isDone = true;
@@ -307,8 +310,22 @@ public class PulsarRecordCursor implements RecordCursor {
                     int batchSize = Math.min(maxBatchSize, entryQueue.capacity() - entryQueue.size());
 
                     if (batchSize > 0) {
-                        outstandingReadsRequests.decrementAndGet();
-                        cursor.asyncReadEntries(batchSize, this, System.nanoTime());
+
+                        ReadOnlyCursorImpl readOnlyCursorImpl = ((ReadOnlyCursorImpl) cursor);
+                        // check if ledger is offloaded
+                        if (!readOffloaded  && readOnlyCursorImpl.getCurrentLedgerInfo().hasOffloadContext()) {
+                            log.warn("Ledger %s is offloaded for topic %s. Ignoring it because offloader is not configured",
+                                    readOnlyCursorImpl.getCurrentLedgerInfo().getLedgerId(), pulsarSplit.getTableName());
+
+                            long numEntries = readOnlyCursorImpl.getCurrentLedgerInfo().getEntries();
+                            long entriesToSkip = (numEntries - ((PositionImpl) cursor.getReadPosition()).getEntryId()) + 1;
+                            cursor.skipEntries(Math.toIntExact((entriesToSkip)));
+
+                            entriesProcessed += entriesToSkip;
+                        } else {
+                            outstandingReadsRequests.decrementAndGet();
+                            cursor.asyncReadEntries(batchSize, this, System.nanoTime());
+                        }
 
                         // stats for successful read request
                         metricsTracker.incr_READ_ATTEMPTS_SUCCESS();
@@ -507,8 +524,13 @@ public class PulsarRecordCursor implements RecordCursor {
             currentMessage.release();
         }
 
-        messageQueue.drain(RawMessage::release);
-        entryQueue.drain(Entry::release);
+        if (messageQueue != null) {
+            messageQueue.drain(RawMessage::release);
+        }
+
+        if (entryQueue != null) {
+            entryQueue.drain(Entry::release);
+        }
 
         if (deserializeEntries != null) {
             deserializeEntries.interrupt();
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
index a2962e5..aa11a73 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
@@ -47,8 +47,8 @@ import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.schema.SchemaInfo;
-import org.apache.pulsar.shade.com.google.common.base.Predicate;
-import org.apache.pulsar.shade.org.apache.bookkeeper.conf.ClientConfiguration;
+import com.google.common.base.Predicate;
+import org.apache.bookkeeper.conf.ClientConfiguration;
 
 import javax.inject.Inject;
 import java.util.Collection;
@@ -126,8 +126,6 @@ public class PulsarSplitManager implements ConnectorSplitManager {
     ManagedLedgerFactory getManagedLedgerFactory() throws Exception {
         ClientConfiguration bkClientConfiguration = new ClientConfiguration()
                 .setZkServers(this.pulsarConnectorConfig.getZookeeperUri())
-                .setAllowShadedLedgerManagerFactoryClass(true)
-                .setShadedLedgerManagerFactoryClassPrefix("org.apache.pulsar.shade.")
                 .setClientTcpNoDelay(false)
                 .setStickyReadsEnabled(true)
                 .setUseV2WireProtocol(true);
@@ -351,10 +349,10 @@ public class PulsarSplitManager implements ConnectorSplitManager {
                             // Just use a close bound since presto can always filter out the extra entries even if
                             // the bound
                             // should be open or a mixture of open and closed
-                            org.apache.pulsar.shade.com.google.common.collect.Range<PositionImpl> posRange
-                                    = org.apache.pulsar.shade.com.google.common.collect.Range.range(overallStartPos,
-                                    org.apache.pulsar.shade.com.google.common.collect.BoundType.CLOSED,
-                                    overallEndPos, org.apache.pulsar.shade.com.google.common.collect.BoundType.CLOSED);
+                            com.google.common.collect.Range<PositionImpl> posRange
+                                    = com.google.common.collect.Range.range(overallStartPos,
+                                    com.google.common.collect.BoundType.CLOSED,
+                                    overallEndPos, com.google.common.collect.BoundType.CLOSED);
 
                             long numOfEntries = readOnlyCursor.getNumberOfEntries(posRange) - 1;
 
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java
index 3cabc8a..1e02d2ba 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.sql.presto;
 
-import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBuf;
 
 public interface SchemaHandler {
 
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
index 7a62f67..cedde20 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
@@ -27,13 +27,17 @@ import com.facebook.presto.spi.type.RealType;
 import com.facebook.presto.spi.type.Type;
 import com.facebook.presto.spi.type.VarcharType;
 import io.airlift.log.Logger;
+import io.netty.buffer.ByteBuf;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.ReadOnlyCursor;
 import org.apache.bookkeeper.mledger.impl.EntryImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.impl.ReadOnlyCursorImpl;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.pulsar.client.admin.Namespaces;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -51,9 +55,9 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
-import org.apache.pulsar.shade.javax.ws.rs.ClientErrorException;
-import org.apache.pulsar.shade.javax.ws.rs.core.Response;
-import org.apache.pulsar.shade.org.apache.bookkeeper.stats.NullStatsProvider;
+import javax.ws.rs.ClientErrorException;
+import javax.ws.rs.core.Response;
+import org.apache.bookkeeper.stats.NullStatsProvider;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -625,10 +629,10 @@ public abstract class TestPulsarConnector {
 
             Schema schema = topicsToSchemas.get(topicSchemaName).getType() == SchemaType.AVRO ? AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build()) : JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
 
-            org.apache.pulsar.shade.io.netty.buffer.ByteBuf payload = org.apache.pulsar.shade.io.netty.buffer.Unpooled
+            ByteBuf payload = io.netty.buffer.Unpooled
                     .copiedBuffer(schema.encode(foo));
 
-            org.apache.pulsar.shade.io.netty.buffer.ByteBuf byteBuf = serializeMetadataAndPayload(
+            ByteBuf byteBuf = serializeMetadataAndPayload(
                     Commands.ChecksumType.Crc32c, messageMetadata, payload);
 
             Entry entry = EntryImpl.create(0, i, byteBuf);
@@ -794,7 +798,7 @@ public abstract class TestPulsarConnector {
                 long entries = topicsToNumEntries.get(schemaName);
 
 
-                ReadOnlyCursor readOnlyCursor = mock(ReadOnlyCursor.class);
+                ReadOnlyCursorImpl readOnlyCursor = mock(ReadOnlyCursorImpl.class);
                 doReturn(entries).when(readOnlyCursor).getNumberOfEntries();
 
                 doAnswer(new Answer<Void>() {
@@ -875,10 +879,10 @@ public abstract class TestPulsarConnector {
 
                                     Schema schema = topicsToSchemas.get(schemaName).getType() == SchemaType.AVRO ? AvroSchema.of(Foo.class) : JSONSchema.of(Foo.class);
 
-                                    org.apache.pulsar.shade.io.netty.buffer.ByteBuf payload = org.apache.pulsar.shade.io.netty.buffer.Unpooled
+                                    ByteBuf payload = io.netty.buffer.Unpooled
                                             .copiedBuffer(schema.encode(foo));
 
-                                    org.apache.pulsar.shade.io.netty.buffer.ByteBuf byteBuf = serializeMetadataAndPayload(
+                                    ByteBuf byteBuf = serializeMetadataAndPayload(
                                             Commands.ChecksumType.Crc32c, messageMetadata, payload);
 
                                     completedBytes += byteBuf.readableBytes();
@@ -907,8 +911,8 @@ public abstract class TestPulsarConnector {
                     @Override
                     public Position answer(InvocationOnMock invocationOnMock) throws Throwable {
                         Object[] args = invocationOnMock.getArguments();
-                        org.apache.pulsar.shade.com.google.common.base.Predicate<Entry> predicate
-                                = (org.apache.pulsar.shade.com.google.common.base.Predicate<Entry>) args[1];
+                        com.google.common.base.Predicate<Entry> predicate
+                                = (com.google.common.base.Predicate<Entry>) args[1];
 
                         String schemaName = TopicName.get(
                                 TopicName.get(
@@ -933,13 +937,15 @@ public abstract class TestPulsarConnector {
                     @Override
                     public Long answer(InvocationOnMock invocationOnMock) throws Throwable {
                         Object[] args = invocationOnMock.getArguments();
-                        org.apache.pulsar.shade.com.google.common.collect.Range<PositionImpl>  range
-                                = (org.apache.pulsar.shade.com.google.common.collect.Range<PositionImpl> ) args[0];
+                        com.google.common.collect.Range<PositionImpl>  range
+                                = (com.google.common.collect.Range<PositionImpl> ) args[0];
 
                         return (range.upperEndpoint().getEntryId() + 1) - range.lowerEndpoint().getEntryId();
                     }
                 });
 
+                when(readOnlyCursor.getCurrentLedgerInfo()).thenReturn(MLDataFormats.ManagedLedgerInfo.LedgerInfo.newBuilder().setLedgerId(0).build());
+
                 return readOnlyCursor;
             }
         });
@@ -948,8 +954,10 @@ public abstract class TestPulsarConnector {
 
         for (Map.Entry<TopicName, PulsarSplit> split : splits.entrySet()) {
 
-            PulsarRecordCursor pulsarRecordCursor = spy(new PulsarRecordCursor(fooColumnHandles, split.getValue(),
-                    pulsarConnectorConfig, managedLedgerFactory, new PulsarConnectorMetricsTracker(new NullStatsProvider())));
+            PulsarRecordCursor pulsarRecordCursor = spy(new PulsarRecordCursor(
+                    fooColumnHandles, split.getValue(),
+                    pulsarConnectorConfig, managedLedgerFactory, new ManagedLedgerConfig(),
+                    new PulsarConnectorMetricsTracker(new NullStatsProvider())));
             this.pulsarRecordCursors.put(split.getKey(), pulsarRecordCursor);
         }
     }
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
index 0803cd6..f03b34f 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
@@ -32,8 +32,8 @@ import org.apache.avro.Schema;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.schema.SchemaInfo;
-import org.apache.pulsar.shade.javax.ws.rs.ClientErrorException;
-import org.apache.pulsar.shade.javax.ws.rs.core.Response;
+import javax.ws.rs.ClientErrorException;
+import javax.ws.rs.core.Response;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 


Mime
View raw message