pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [incubator-pulsar] branch master updated: [tiered storage] Provide LedgerOffloaderFactory for creating offloaders (#2392)
Date Fri, 17 Aug 2018 15:38:20 GMT
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 83d33d4  [tiered storage] Provide LedgerOffloaderFactory for creating offloaders
(#2392)
83d33d4 is described below

commit 83d33d4a7ff9523be7892324bdc322ed6480ad38
Author: Sijie Guo <guosijie@gmail.com>
AuthorDate: Fri Aug 17 08:38:17 2018 -0700

    [tiered storage] Provide LedgerOffloaderFactory for creating offloaders (#2392)
    
    * [tiered storage] Provide LedgerOffloaderFactory for creating offloaders
    
     ### Motivation
    
    In order to use NAR for packaging offloaders, we need a factory interface for creating
offloaders.
    
     ### Changes
    
    - Provide a ledger offloader factory interface for creating offloaders.
    - Move implemention specific settings to implementation package to be respecting to offloader
factory interface
    
    * remove unneeded change
---
 .../apache/bookkeeper/mledger/LedgerOffloader.java |   2 +-
 .../bookkeeper/mledger/LedgerOffloaderFactory.java |  57 ++++++++++
 .../apache/pulsar/broker/ServiceConfiguration.java | 120 +--------------------
 .../org/apache/pulsar/broker/PulsarService.java    |  33 ++----
 .../jcloud/JCloudLedgerOffloaderFactory.java       |  51 +++++++++
 .../jcloud/TieredStorageConfigurationData.java     |  30 +++++-
 6 files changed, 150 insertions(+), 143 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
index 719b0c9..6885500 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
@@ -27,7 +27,7 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.client.api.ReadHandle;
 
 /**
- * Interface for offloading ledgers to longterm storage
+ * Interface for offloading ledgers to long-term storage
  */
 @Beta
 public interface LedgerOffloader {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java
new file mode 100644
index 0000000..f0a6890
--- /dev/null
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.bookkeeper.common.annotation.InterfaceAudience.LimitedPrivate;
+import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+
+/**
+ * Factory to create {@link LedgerOffloader} to offload ledgers into long-term storage.
+ */
+@LimitedPrivate
+@Evolving
+public interface LedgerOffloaderFactory<T extends LedgerOffloader> {
+
+    /**
+     * Check whether the provided driver <tt>driverName</tt> is supported.
+     *
+     * @param driverName offloader driver name
+     * @return true if the driver is supported, otherwise false.
+     */
+    boolean isDriverSupported(String driverName);
+
+    /**
+     * Create a ledger offloader with the provided configuration, user-metadata and scheduler.
+     *
+     * @param properties service configuration
+     * @param userMetadata user metadata
+     * @param scheduler scheduler
+     * @return the offloader instance
+     * @throws IOException when fail to create an offloader
+     */
+    T create(Properties properties,
+             Map<String, String> userMetadata,
+             OrderedScheduler scheduler)
+        throws IOException;
+
+}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index c18ec2d..9efa2a6 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -480,48 +480,16 @@ public class ServiceConfiguration implements PulsarConfiguration {
     private boolean exposePublisherStats = true;
 
     /**** --- Ledger Offloading --- ****/
+    /****
+     * NOTES: all implementation related settings should be put in implementation package.
+     *        only common settings like driver name, io threads can be added here.
+     ****/
     // Driver to use to offload old data to long term storage
     private String managedLedgerOffloadDriver = null;
 
     // Maximum number of thread pool threads for ledger offloading
     private int managedLedgerOffloadMaxThreads = 2;
 
-    // For Amazon S3 ledger offload, AWS region
-    private String s3ManagedLedgerOffloadRegion = null;
-
-    // For Amazon S3 ledger offload, Bucket to place offloaded ledger into
-    private String s3ManagedLedgerOffloadBucket = null;
-
-    // For Amazon S3 ledger offload, Alternative endpoint to connect to (useful for testing)
-    private String s3ManagedLedgerOffloadServiceEndpoint = null;
-
-    // For Amazon S3 ledger offload, Max block size in bytes.
-    @FieldContext(minValue = 5242880) // 5MB
-    private int s3ManagedLedgerOffloadMaxBlockSizeInBytes = 64 * 1024 * 1024; // 64MB
-
-    // For Amazon S3 ledger offload, Read buffer size in bytes.
-    @FieldContext(minValue = 1024)
-    private int s3ManagedLedgerOffloadReadBufferSizeInBytes = 1024 * 1024; // 1MB
-
-    // For Google Cloud Storage ledger offload, region where offload bucket is located.
-    // reference this page for more details: https://cloud.google.com/storage/docs/bucket-locations
-    private String gcsManagedLedgerOffloadRegion = null;
-
-    // For Google Cloud Storage ledger offload, Bucket to place offloaded ledger into
-    private String gcsManagedLedgerOffloadBucket = null;
-
-    // For Google Cloud Storage ledger offload, Max block size in bytes.
-    @FieldContext(minValue = 5242880) // 5MB
-    private int gcsManagedLedgerOffloadMaxBlockSizeInBytes = 64 * 1024 * 1024; // 64MB
-
-    // For Google Cloud Storage ledger offload, Read buffer size in bytes.
-    @FieldContext(minValue = 1024)
-    private int gcsManagedLedgerOffloadReadBufferSizeInBytes = 1024 * 1024; // 1MB
-
-    // For Google Cloud Storage, path to json file containing service account credentials.
-    // For more details, see the "Service Accounts" section of https://support.google.com/googleapi/answer/6158849
-    private String gcsManagedLedgerOffloadServiceAccountKeyFile = null;
-
     public String getZookeeperServers() {
         return zookeeperServers;
     }
@@ -1721,86 +1689,6 @@ public class ServiceConfiguration implements PulsarConfiguration {
         return this.managedLedgerOffloadMaxThreads;
     }
 
-    public void setS3ManagedLedgerOffloadRegion(String region) {
-        this.s3ManagedLedgerOffloadRegion = region;
-    }
-
-    public String getS3ManagedLedgerOffloadRegion() {
-        return this.s3ManagedLedgerOffloadRegion;
-    }
-
-    public void setS3ManagedLedgerOffloadBucket(String bucket) {
-        this.s3ManagedLedgerOffloadBucket = bucket;
-    }
-
-    public String getS3ManagedLedgerOffloadBucket() {
-        return this.s3ManagedLedgerOffloadBucket;
-    }
-
-    public void setS3ManagedLedgerOffloadServiceEndpoint(String endpoint) {
-        this.s3ManagedLedgerOffloadServiceEndpoint = endpoint;
-    }
-
-    public String getS3ManagedLedgerOffloadServiceEndpoint() {
-        return this.s3ManagedLedgerOffloadServiceEndpoint;
-    }
-
-    public void setS3ManagedLedgerOffloadMaxBlockSizeInBytes(int blockSizeInBytes) {
-        this.s3ManagedLedgerOffloadMaxBlockSizeInBytes = blockSizeInBytes;
-    }
-
-    public int getS3ManagedLedgerOffloadMaxBlockSizeInBytes() {
-        return this.s3ManagedLedgerOffloadMaxBlockSizeInBytes;
-    }
-
-    public void setS3ManagedLedgerOffloadReadBufferSizeInBytes(int readBufferSizeInBytes)
{
-        this.s3ManagedLedgerOffloadReadBufferSizeInBytes = readBufferSizeInBytes;
-    }
-
-    public int getS3ManagedLedgerOffloadReadBufferSizeInBytes() {
-        return this.s3ManagedLedgerOffloadReadBufferSizeInBytes;
-    }
-
-    public void setGcsManagedLedgerOffloadRegion(String region) {
-        this.gcsManagedLedgerOffloadRegion = region;
-    }
-
-    public String getGcsManagedLedgerOffloadRegion() {
-        return this.gcsManagedLedgerOffloadRegion;
-    }
-
-    public void setGcsManagedLedgerOffloadBucket(String bucket) {
-        this.gcsManagedLedgerOffloadBucket = bucket;
-    }
-
-    public String getGcsManagedLedgerOffloadBucket() {
-        return this.gcsManagedLedgerOffloadBucket;
-    }
-
-    public void setGcsManagedLedgerOffloadMaxBlockSizeInBytes(int blockSizeInBytes) {
-        this.gcsManagedLedgerOffloadMaxBlockSizeInBytes = blockSizeInBytes;
-    }
-
-    public int getGcsManagedLedgerOffloadMaxBlockSizeInBytes() {
-        return this.gcsManagedLedgerOffloadMaxBlockSizeInBytes;
-    }
-
-    public void setGcsManagedLedgerOffloadReadBufferSizeInBytes(int readBufferSizeInBytes)
{
-        this.gcsManagedLedgerOffloadReadBufferSizeInBytes = readBufferSizeInBytes;
-    }
-
-    public int getGcsManagedLedgerOffloadReadBufferSizeInBytes() {
-        return this.gcsManagedLedgerOffloadReadBufferSizeInBytes;
-    }
-
-    public void setGcsManagedLedgerOffloadServiceAccountKeyFile(String keyPath) {
-        this.gcsManagedLedgerOffloadServiceAccountKeyFile = keyPath;
-    }
-
-    public String getGcsManagedLedgerOffloadServiceAccountKeyFile() {
-        return this.gcsManagedLedgerOffloadServiceAccountKeyFile;
-    }
-
     public void setBrokerServiceCompactionMonitorIntervalInSeconds(int interval) {
         this.brokerServiceCompactionMonitorIntervalInSeconds = interval;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index c4b89fb..d965c7e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -47,8 +47,10 @@ import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
+import org.apache.bookkeeper.mledger.offload.jcloud.JCloudLedgerOffloaderFactory;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
 import org.apache.pulsar.broker.admin.AdminResource;
@@ -62,8 +64,6 @@ import org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask;
 import org.apache.pulsar.broker.loadbalance.LoadSheddingTask;
 import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
 import org.apache.pulsar.broker.namespace.NamespaceService;
-import org.apache.bookkeeper.mledger.offload.jcloud.TieredStorageConfigurationData;
-import org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
@@ -664,11 +664,14 @@ public class PulsarService implements AutoCloseable {
     public synchronized LedgerOffloader createManagedLedgerOffloader(ServiceConfiguration
conf)
             throws PulsarServerException {
         try {
+            // TODO: will make this configurable when switching to use NAR loader to load
offloaders
+            LedgerOffloaderFactory offloaderFactory = JCloudLedgerOffloaderFactory.of();
+
             if (conf.getManagedLedgerOffloadDriver() != null
-                && BlobStoreManagedLedgerOffloader.driverSupported(conf.getManagedLedgerOffloadDriver()))
{
+                && offloaderFactory.isDriverSupported(conf.getManagedLedgerOffloadDriver()))
{
                 try {
-                    return BlobStoreManagedLedgerOffloader.create(
-                        getTieredStorageConf(conf),
+                    return offloaderFactory.create(
+                        conf.getProperties(),
                         ImmutableMap.of(
                             METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarBrokerVersionStringUtils.getNormalizedVersionString(),
                             METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarBrokerVersionStringUtils.getGitSha()
@@ -685,26 +688,6 @@ public class PulsarService implements AutoCloseable {
         }
     }
 
-    private static TieredStorageConfigurationData getTieredStorageConf(ServiceConfiguration
serverConf) {
-        TieredStorageConfigurationData tsConf = new TieredStorageConfigurationData();
-        // generic settings
-        tsConf.setManagedLedgerOffloadDriver(serverConf.getManagedLedgerOffloadDriver());
-        tsConf.setManagedLedgerOffloadMaxThreads(serverConf.getManagedLedgerOffloadMaxThreads());
-        // s3 settings
-        tsConf.setS3ManagedLedgerOffloadRegion(serverConf.getS3ManagedLedgerOffloadRegion());
-        tsConf.setS3ManagedLedgerOffloadBucket(serverConf.getS3ManagedLedgerOffloadBucket());
-        tsConf.setS3ManagedLedgerOffloadServiceEndpoint(serverConf.getS3ManagedLedgerOffloadServiceEndpoint());
-        tsConf.setS3ManagedLedgerOffloadMaxBlockSizeInBytes(serverConf.getS3ManagedLedgerOffloadMaxBlockSizeInBytes());
-        tsConf.setS3ManagedLedgerOffloadReadBufferSizeInBytes(serverConf.getS3ManagedLedgerOffloadReadBufferSizeInBytes());
-        // gcs settings
-        tsConf.setGcsManagedLedgerOffloadRegion(serverConf.getGcsManagedLedgerOffloadRegion());
-        tsConf.setGcsManagedLedgerOffloadBucket(serverConf.getGcsManagedLedgerOffloadBucket());
-        tsConf.setGcsManagedLedgerOffloadServiceAccountKeyFile(serverConf.getGcsManagedLedgerOffloadServiceAccountKeyFile());
-        tsConf.setGcsManagedLedgerOffloadMaxBlockSizeInBytes(serverConf.getGcsManagedLedgerOffloadMaxBlockSizeInBytes());
-        tsConf.setGcsManagedLedgerOffloadReadBufferSizeInBytes(serverConf.getGcsManagedLedgerOffloadReadBufferSizeInBytes());
-        return tsConf;
-    }
-
     public ZooKeeperCache getLocalZkCache() {
         return localZkCache;
     }
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java
new file mode 100644
index 0000000..dffe253
--- /dev/null
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.jcloud;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
+import org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader;
+
+/**
+ * A jcloud based offloader factory.
+ */
+public class JCloudLedgerOffloaderFactory implements LedgerOffloaderFactory<BlobStoreManagedLedgerOffloader>
{
+
+    public static JCloudLedgerOffloaderFactory of() {
+        return INSTANCE;
+    }
+
+    private static final JCloudLedgerOffloaderFactory INSTANCE = new JCloudLedgerOffloaderFactory();
+
+    @Override
+    public boolean isDriverSupported(String driverName) {
+        return BlobStoreManagedLedgerOffloader.driverSupported(driverName);
+    }
+
+    @Override
+    public BlobStoreManagedLedgerOffloader create(Properties properties,
+                                                  Map<String, String> userMetadata,
+                                                  OrderedScheduler scheduler) throws IOException
 {
+        TieredStorageConfigurationData data = TieredStorageConfigurationData.create(properties);
+        return BlobStoreManagedLedgerOffloader.create(data, userMetadata, scheduler);
+    }
+}
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/TieredStorageConfigurationData.java
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/TieredStorageConfigurationData.java
index 7c0d26a..52fedfd 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/TieredStorageConfigurationData.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/TieredStorageConfigurationData.java
@@ -18,14 +18,19 @@
  */
 package org.apache.bookkeeper.mledger.offload.jcloud;
 
+import static org.apache.pulsar.common.util.FieldParser.value;
+
 import java.io.Serializable;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Properties;
 import lombok.Data;
 
 /**
  * Configuration for tiered storage.
  */
 @Data
-public class TieredStorageConfigurationData implements Serializable, Cloneable{
+public class TieredStorageConfigurationData implements Serializable, Cloneable {
 
     /**** --- Ledger Offloading --- ****/
     // Driver to use to offload old data to long term storage
@@ -66,4 +71,27 @@ public class TieredStorageConfigurationData implements Serializable, Cloneable{
     // For more details, see the "Service Accounts" section of https://support.google.com/googleapi/answer/6158849
     private String gcsManagedLedgerOffloadServiceAccountKeyFile = null;
 
+    /**
+     * Create a tiered storage configuration from the provided <tt>properties</tt>.
+     *
+     * @param properties the configuration properties
+     * @return tiered storage configuration
+     */
+    public static TieredStorageConfigurationData create(Properties properties) {
+        TieredStorageConfigurationData data = new TieredStorageConfigurationData();
+        Field[] fields = TieredStorageConfigurationData.class.getDeclaredFields();
+        Arrays.stream(fields).forEach(f -> {
+            if (properties.containsKey(f.getName())) {
+                try {
+                    f.setAccessible(true);
+                    f.set(data, value((String) properties.get(f.getName()), f));
+                } catch (Exception e) {
+                    throw new IllegalArgumentException(String.format("failed to initialize
%s field while setting value %s",
+                            f.getName(), properties.get(f.getName())), e);
+                }
+            }
+        });
+        return data;
+    }
+
 }


Mime
View raw message