distributedlog-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [07/51] [partial] incubator-distributedlog git commit: DL-4: Repackage the source under apache namespace
Date Thu, 05 Jan 2017 00:51:12 GMT
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClient.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClient.java
b/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClient.java
new file mode 100644
index 0000000..a7b17f4
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClient.java
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import com.google.common.base.Optional;
+import org.apache.distributedlog.ZooKeeperClient.Credentials;
+import org.apache.distributedlog.ZooKeeperClient.DigestCredentials;
+import org.apache.distributedlog.exceptions.AlreadyClosedException;
+import org.apache.distributedlog.exceptions.DLInterruptedException;
+import org.apache.distributedlog.exceptions.ZKException;
+import org.apache.distributedlog.net.NetUtils;
+import org.apache.distributedlog.util.ConfUtils;
+import com.twitter.util.Future;
+import com.twitter.util.Promise;
+import com.twitter.util.Return;
+import com.twitter.util.Throw;
+import org.apache.bookkeeper.client.AsyncCallback;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.net.DNSToSwitchMapping;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
+import org.apache.bookkeeper.zookeeper.RetryPolicy;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.zookeeper.KeeperException;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import static com.google.common.base.Charsets.UTF_8;
+
+/**
+ * BookKeeper Client wrapper over {@link BookKeeper}.
+ *
+ * <h3>Metrics</h3>
+ * <ul>
+ * <li> bookkeeper operation stats are exposed under current scope by {@link BookKeeper}
+ * </ul>
+ */
+public class BookKeeperClient {
+    static final Logger LOG = LoggerFactory.getLogger(BookKeeperClient.class);
+
+    // Parameters to build bookkeeper client
+    private final DistributedLogConfiguration conf;
+    private final String name;
+    private final String zkServers;
+    private final String ledgersPath;
+    private final byte[] passwd;
+    private final ClientSocketChannelFactory channelFactory;
+    private final HashedWheelTimer requestTimer;
+    private final StatsLogger statsLogger;
+
+    // bookkeeper client state
+    private boolean closed = false;
+    private BookKeeper bkc = null;
+    private ZooKeeperClient zkc;
+    private final boolean ownZK;
+    // feature provider
+    private final Optional<FeatureProvider> featureProvider;
+
+    @SuppressWarnings("deprecation")
+    private synchronized void commonInitialization(
+            DistributedLogConfiguration conf, String ledgersPath,
+            ClientSocketChannelFactory channelFactory, StatsLogger statsLogger, HashedWheelTimer
requestTimer)
+        throws IOException, InterruptedException, KeeperException {
+        ClientConfiguration bkConfig = new ClientConfiguration();
+        bkConfig.setAddEntryTimeout(conf.getBKClientWriteTimeout());
+        bkConfig.setReadTimeout(conf.getBKClientReadTimeout());
+        bkConfig.setZkLedgersRootPath(ledgersPath);
+        bkConfig.setZkTimeout(conf.getBKClientZKSessionTimeoutMilliSeconds());
+        bkConfig.setNumWorkerThreads(conf.getBKClientNumberWorkerThreads());
+        bkConfig.setEnsemblePlacementPolicy(RegionAwareEnsemblePlacementPolicy.class);
+        bkConfig.setZkRequestRateLimit(conf.getBKClientZKRequestRateLimit());
+        bkConfig.setProperty(RegionAwareEnsemblePlacementPolicy.REPP_DISALLOW_BOOKIE_PLACEMENT_IN_REGION_FEATURE_NAME,
+                DistributedLogConstants.DISALLOW_PLACEMENT_IN_REGION_FEATURE_NAME);
+        // reload configuration from dl configuration with settings prefixed with 'bkc.'
+        ConfUtils.loadConfiguration(bkConfig, conf, "bkc.");
+
+        Class<? extends DNSToSwitchMapping> dnsResolverCls;
+        try {
+            dnsResolverCls = conf.getEnsemblePlacementDnsResolverClass();
+        } catch (ConfigurationException e) {
+            LOG.error("Failed to load bk dns resolver : ", e);
+            throw new IOException("Failed to load bk dns resolver : ", e);
+        }
+        final DNSToSwitchMapping dnsResolver =
+                NetUtils.getDNSResolver(dnsResolverCls, conf.getBkDNSResolverOverrides());
+
+        this.bkc = BookKeeper.newBuilder()
+            .config(bkConfig)
+            .zk(zkc.get())
+            .channelFactory(channelFactory)
+            .statsLogger(statsLogger)
+            .dnsResolver(dnsResolver)
+            .requestTimer(requestTimer)
+            .featureProvider(featureProvider.orNull())
+            .build();
+    }
+
+    BookKeeperClient(DistributedLogConfiguration conf,
+                     String name,
+                     String zkServers,
+                     ZooKeeperClient zkc,
+                     String ledgersPath,
+                     ClientSocketChannelFactory channelFactory,
+                     HashedWheelTimer requestTimer,
+                     StatsLogger statsLogger,
+                     Optional<FeatureProvider> featureProvider) {
+        this.conf = conf;
+        this.name = name;
+        this.zkServers = zkServers;
+        this.ledgersPath = ledgersPath;
+        this.passwd = conf.getBKDigestPW().getBytes(UTF_8);
+        this.channelFactory = channelFactory;
+        this.requestTimer = requestTimer;
+        this.statsLogger = statsLogger;
+        this.featureProvider = featureProvider;
+        this.ownZK = null == zkc;
+        if (null != zkc) {
+            // reference the passing zookeeper client
+            this.zkc = zkc;
+        }
+    }
+
+    private synchronized void initialize() throws IOException {
+        if (null != this.bkc) {
+            return;
+        }
+        if (null == this.zkc) {
+            int zkSessionTimeout = conf.getBKClientZKSessionTimeoutMilliSeconds();
+            RetryPolicy retryPolicy = new BoundExponentialBackoffRetryPolicy(
+                        conf.getBKClientZKRetryBackoffStartMillis(),
+                        conf.getBKClientZKRetryBackoffMaxMillis(), conf.getBKClientZKNumRetries());
+            Credentials credentials = Credentials.NONE;
+            if (conf.getZkAclId() != null) {
+                credentials = new DigestCredentials(conf.getZkAclId(), conf.getZkAclId());
+            }
+
+            this.zkc = new ZooKeeperClient(name + ":zk", zkSessionTimeout, 2 * zkSessionTimeout,
zkServers,
+                                           retryPolicy, statsLogger.scope("bkc_zkc"), conf.getZKClientNumberRetryThreads(),
+                                           conf.getBKClientZKRequestRateLimit(), credentials);
+        }
+
+        try {
+            commonInitialization(conf, ledgersPath, channelFactory, statsLogger, requestTimer);
+        } catch (InterruptedException e) {
+            throw new DLInterruptedException("Interrupted on creating bookkeeper client "
+ name + " : ", e);
+        } catch (KeeperException e) {
+            throw new ZKException("Error on creating bookkeeper client " + name + " : ",
e);
+        }
+
+        if (ownZK) {
+            LOG.info("BookKeeper Client created {} with its own ZK Client : ledgersPath =
{}, numRetries = {}, " +
+                    "sessionTimeout = {}, backoff = {}, maxBackoff = {}, dnsResolver = {}",
+                    new Object[] { name, ledgersPath,
+                    conf.getBKClientZKNumRetries(), conf.getBKClientZKSessionTimeoutMilliSeconds(),
+                    conf.getBKClientZKRetryBackoffStartMillis(), conf.getBKClientZKRetryBackoffMaxMillis(),
+                    conf.getBkDNSResolverOverrides() });
+        } else {
+            LOG.info("BookKeeper Client created {} with shared zookeeper client : ledgersPath
= {}, numRetries = {}, " +
+                    "sessionTimeout = {}, backoff = {}, maxBackoff = {}, dnsResolver = {}",
+                    new Object[] { name, ledgersPath,
+                    conf.getZKNumRetries(), conf.getZKSessionTimeoutMilliseconds(),
+                    conf.getZKRetryBackoffStartMillis(), conf.getZKRetryBackoffMaxMillis(),
+                    conf.getBkDNSResolverOverrides() });
+        }
+    }
+
+
+    public synchronized BookKeeper get() throws IOException {
+        checkClosedOrInError();
+        if (null == bkc) {
+            initialize();
+        }
+        return bkc;
+    }
+
+    // Util functions
+    public Future<LedgerHandle> createLedger(int ensembleSize,
+                                             int writeQuorumSize,
+                                             int ackQuorumSize) {
+        BookKeeper bk;
+        try {
+            bk = get();
+        } catch (IOException ioe) {
+            return Future.exception(ioe);
+        }
+        final Promise<LedgerHandle> promise = new Promise<LedgerHandle>();
+        bk.asyncCreateLedger(ensembleSize, writeQuorumSize, ackQuorumSize,
+                BookKeeper.DigestType.CRC32, passwd, new AsyncCallback.CreateCallback() {
+                    @Override
+                    public void createComplete(int rc, LedgerHandle lh, Object ctx) {
+                        if (BKException.Code.OK == rc) {
+                            promise.updateIfEmpty(new Return<LedgerHandle>(lh));
+                        } else {
+                            promise.updateIfEmpty(new Throw<LedgerHandle>(BKException.create(rc)));
+                        }
+                    }
+                }, null);
+        return promise;
+    }
+
+    public Future<Void> deleteLedger(long lid,
+                                     final boolean ignoreNonExistentLedger) {
+        BookKeeper bk;
+        try {
+            bk = get();
+        } catch (IOException ioe) {
+            return Future.exception(ioe);
+        }
+        final Promise<Void> promise = new Promise<Void>();
+        bk.asyncDeleteLedger(lid, new AsyncCallback.DeleteCallback() {
+            @Override
+            public void deleteComplete(int rc, Object ctx) {
+                if (BKException.Code.OK == rc) {
+                    promise.updateIfEmpty(new Return<Void>(null));
+                } else if (BKException.Code.NoSuchLedgerExistsException == rc) {
+                    if (ignoreNonExistentLedger) {
+                        promise.updateIfEmpty(new Return<Void>(null));
+                    } else {
+                        promise.updateIfEmpty(new Throw<Void>(BKException.create(rc)));
+                    }
+                } else {
+                    promise.updateIfEmpty(new Throw<Void>(BKException.create(rc)));
+                }
+            }
+        }, null);
+        return promise;
+    }
+
+    public void close() {
+        BookKeeper bkcToClose;
+        ZooKeeperClient zkcToClose;
+        synchronized (this) {
+            if (closed) {
+                return;
+            }
+            closed = true;
+            bkcToClose = bkc;
+            zkcToClose = zkc;
+        }
+
+        LOG.info("BookKeeper Client closed {}", name);
+        if (null != bkcToClose) {
+            try {
+                bkcToClose.close();
+            } catch (InterruptedException e) {
+                LOG.warn("Interrupted on closing bookkeeper client {} : ", name, e);
+                Thread.currentThread().interrupt();
+            } catch (BKException e) {
+                LOG.warn("Error on closing bookkeeper client {} : ", name, e);
+            }
+        }
+        if (null != zkcToClose) {
+            if (ownZK) {
+                zkcToClose.close();
+            }
+        }
+    }
+
+    public synchronized void checkClosedOrInError() throws AlreadyClosedException {
+        if (closed) {
+            LOG.error("BookKeeper Client {} is already closed", name);
+            throw new AlreadyClosedException("BookKeeper Client " + name + " is already closed");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClientBuilder.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClientBuilder.java
b/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClientBuilder.java
new file mode 100644
index 0000000..a356f9f
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClientBuilder.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.apache.bookkeeper.feature.FeatureProvider;
+
+import org.apache.bookkeeper.feature.Feature;
+
+/**
+ * Builder to build bookkeeper client.
+ */
+public class BookKeeperClientBuilder {
+
+    /**
+     * Create a bookkeeper client builder to build bookkeeper clients.
+     *
+     * @return bookkeeper client builder.
+     */
+    public static BookKeeperClientBuilder newBuilder() {
+        return new BookKeeperClientBuilder();
+    }
+
+    // client name
+    private String name = null;
+    // dl config
+    private DistributedLogConfiguration dlConfig = null;
+    // bookkeeper settings
+    // zookeeper client
+    private ZooKeeperClient zkc = null;
+    // or zookeeper servers
+    private String zkServers = null;
+    // ledgers path
+    private String ledgersPath = null;
+    // statsLogger
+    private StatsLogger statsLogger = NullStatsLogger.INSTANCE;
+    // client channel factory
+    private ClientSocketChannelFactory channelFactory = null;
+    // request timer
+    private HashedWheelTimer requestTimer = null;
+    // feature provider
+    private Optional<FeatureProvider> featureProvider = Optional.absent();
+
+    // Cached BookKeeper Client
+    private BookKeeperClient cachedClient = null;
+
+    /**
+     * Private bookkeeper builder.
+     */
+    private BookKeeperClientBuilder() {}
+
+    /**
+     * Set client name.
+     *
+     * @param name
+     *          client name.
+     * @return builder
+     */
+    public synchronized BookKeeperClientBuilder name(String name) {
+        this.name = name;
+        return this;
+    }
+
+    /**
+     * <i>dlConfig</i> used to configure bookkeeper client.
+     *
+     * @param dlConfig
+     *          distributedlog config.
+     * @return builder.
+     */
+    public synchronized BookKeeperClientBuilder dlConfig(DistributedLogConfiguration dlConfig)
{
+        this.dlConfig = dlConfig;
+        return this;
+    }
+
+    /**
+     * Set the zkc used to build bookkeeper client. If a zookeeper client is provided in
this
+     * method, bookkeeper client will use it rather than creating a brand new one.
+     *
+     * @param zkc
+     *          zookeeper client.
+     * @return builder
+     * @see #zkServers(String)
+     */
+    public synchronized BookKeeperClientBuilder zkc(ZooKeeperClient zkc) {
+        this.zkc = zkc;
+        return this;
+    }
+
+    /**
+     * Set the zookeeper servers that bookkeeper client would connect to. If no zookeeper
client
+     * is provided by {@link #zkc(ZooKeeperClient)}, bookkeeper client will use the given
string
+     * to create a brand new zookeeper client.
+     *
+     * @param zkServers
+     *          zookeeper servers that bookkeeper client would connect to.
+     * @return builder
+     * @see #zkc(ZooKeeperClient)
+     */
+    public synchronized BookKeeperClientBuilder zkServers(String zkServers) {
+        this.zkServers = zkServers;
+        return this;
+    }
+
+    /**
+     * Set the ledgers path that bookkeeper client is going to access.
+     *
+     * @param ledgersPath
+     *          ledgers path
+     * @return builder
+     * @see org.apache.bookkeeper.conf.ClientConfiguration#getZkLedgersRootPath()
+     */
+    public synchronized BookKeeperClientBuilder ledgersPath(String ledgersPath) {
+        this.ledgersPath = ledgersPath;
+        return this;
+    }
+
+    /**
+     * Build BookKeeper client using existing <i>bkc</i> client.
+     *
+     * @param bkc
+     *          bookkeeper client.
+     * @return builder
+     */
+    public synchronized BookKeeperClientBuilder bkc(BookKeeperClient bkc) {
+        this.cachedClient = bkc;
+        return this;
+    }
+
+    /**
+     * Build BookKeeper client using existing <i>channelFactory</i>.
+     *
+     * @param channelFactory
+     *          Channel Factory used to build bookkeeper client.
+     * @return bookkeeper client builder.
+     */
+    public synchronized BookKeeperClientBuilder channelFactory(ClientSocketChannelFactory
channelFactory) {
+        this.channelFactory = channelFactory;
+        return this;
+    }
+
+    /**
+     * Build BookKeeper client using existing <i>request timer</i>.
+     *
+     * @param requestTimer
+     *          HashedWheelTimer used to build bookkeeper client.
+     * @return bookkeeper client builder.
+     */
+    public synchronized BookKeeperClientBuilder requestTimer(HashedWheelTimer requestTimer)
{
+        this.requestTimer = requestTimer;
+        return this;
+    }
+
+    /**
+     * Build BookKeeper Client using given stats logger <i>statsLogger</i>.
+     *
+     * @param statsLogger
+     *          stats logger to report stats
+     * @return builder.
+     */
+    public synchronized BookKeeperClientBuilder statsLogger(StatsLogger statsLogger) {
+        this.statsLogger = statsLogger;
+        return this;
+    }
+
+    public synchronized BookKeeperClientBuilder featureProvider(Optional<FeatureProvider>
featureProvider) {
+        this.featureProvider = featureProvider;
+        return this;
+    }
+
+    private void validateParameters() {
+        Preconditions.checkNotNull(name, "Missing client name.");
+        Preconditions.checkNotNull(dlConfig, "Missing DistributedLog Configuration.");
+        Preconditions.checkArgument(null == zkc || null == zkServers, "Missing zookeeper
setting.");
+        Preconditions.checkNotNull(ledgersPath, "Missing Ledgers Root Path.");
+    }
+
+    public synchronized BookKeeperClient build() {
+        if (null == cachedClient) {
+            cachedClient = buildClient();
+        }
+        return cachedClient;
+    }
+
+    private BookKeeperClient buildClient() {
+        validateParameters();
+        return new BookKeeperClient(dlConfig, name, zkServers, zkc, ledgersPath, channelFactory,
requestTimer, statsLogger, featureProvider);
+    }
+}


Mime
View raw message