bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [1/3] bookkeeper git commit: BOOKKEEPER-950: Ledger placement policy to accomodate different storage capacity of bookies
Date Tue, 28 Mar 2017 20:35:33 GMT
Repository: bookkeeper
Updated Branches:
  refs/heads/master 1e4ccaf16 -> 0583175de


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0583175d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java
new file mode 100644
index 0000000..88c5eb1
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.proto;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.proto.BookkeeperProtocol.GetBookieInfoRequest;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.GetBookieInfoResponse;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
+import org.apache.bookkeeper.util.MathUtils;
+import org.jboss.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GetBookieInfoProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
+    private final static Logger LOG = LoggerFactory.getLogger(GetBookieInfoProcessorV3.class);
+
+    public GetBookieInfoProcessorV3(Request request, Channel channel,
+                                     BookieRequestProcessor requestProcessor) {
+        super(request, channel, requestProcessor);
+    }
+
+    private GetBookieInfoResponse getGetBookieInfoResponse() {
+        long startTimeNanos = MathUtils.nowInNano();
+        GetBookieInfoRequest getBookieInfoRequest = request.getGetBookieInfoRequest();
+        long requested = getBookieInfoRequest.getRequested();
+
+        GetBookieInfoResponse.Builder getBookieInfoResponse = GetBookieInfoResponse.newBuilder();
+
+        if (!isVersionCompatible()) {
+            getBookieInfoResponse.setStatus(StatusCode.EBADVERSION);
+            requestProcessor.getBookieInfoStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
+                    TimeUnit.NANOSECONDS);
+            return getBookieInfoResponse.build();
+        }
+
+        LOG.debug("Received new getBookieInfo request: {}", request);
+        StatusCode status = StatusCode.EOK;
+        long freeDiskSpace = 0L, totalDiskSpace = 0L;
+        if ((requested & GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE) != 0) {
+            freeDiskSpace = requestProcessor.bookie.getTotalFreeSpace();
+            getBookieInfoResponse.setFreeDiskSpace(freeDiskSpace);
+        }
+        if ((requested & GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE) != 0) {
+            totalDiskSpace = requestProcessor.bookie.getTotalDiskSpace();
+            getBookieInfoResponse.setTotalDiskCapacity(totalDiskSpace);
+        }
+        LOG.debug("FreeDiskSpace info is " + freeDiskSpace + " totalDiskSpace is: " + totalDiskSpace);
+        getBookieInfoResponse.setStatus(status);
+        requestProcessor.getBookieInfoStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
+                TimeUnit.NANOSECONDS);
+        return getBookieInfoResponse.build();
+    }
+
+    @Override
+    public void safeRun() {
+        GetBookieInfoResponse getBookieInfoResponse = getGetBookieInfoResponse();
+        sendResponse(getBookieInfoResponse);
+    }
+
+    private void sendResponse(GetBookieInfoResponse getBookieInfoResponse) {
+        Response.Builder response = Response.newBuilder()
+                .setHeader(getHeader())
+                .setStatus(getBookieInfoResponse.getStatus())
+                .setGetBookieInfoResponse(getBookieInfoResponse);
+        sendResponse(response.getStatus(),
+                     response.build(),
+                     requestProcessor.getBookieInfoStats);
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0583175d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 580a905..f6e9e8f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -32,25 +32,29 @@ import org.apache.bookkeeper.auth.ClientAuthProvider;
 import com.google.protobuf.ByteString;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeperClientStats;
+import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacResponse;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.GetBookieInfoRequest;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.GetBookieInfoResponse;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.OperationType;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.ProtocolVersion;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacResponse;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacResponse;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadRequest;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacResponse;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
@@ -127,6 +131,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
     final int addEntryTimeout;
     final int readEntryTimeout;
     final int maxFrameSize;
+    final int getBookieInfoTimeout;
 
     private final ConcurrentHashMap<CompletionKey, CompletionValue> completionObjects = new ConcurrentHashMap<CompletionKey, CompletionValue>();
 
@@ -139,6 +144,8 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
     private final OpStatsLogger addTimeoutOpLogger;
     private final OpStatsLogger writeLacTimeoutOpLogger;
     private final OpStatsLogger readLacTimeoutOpLogger;
+    private final OpStatsLogger getBookieInfoOpLogger;
+    private final OpStatsLogger getBookieInfoTimeoutOpLogger;
 
     /**
      * The following member variables do not need to be concurrent, or volatile
@@ -194,6 +201,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
         this.requestTimer = requestTimer;
         this.addEntryTimeout = conf.getAddEntryTimeout();
         this.readEntryTimeout = conf.getReadEntryTimeout();
+        this.getBookieInfoTimeout = conf.getBookieInfoTimeout();
 
         this.authProviderFactory = authProviderFactory;
         this.extRegistry = extRegistry;
@@ -209,10 +217,12 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
         addEntryOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_ADD_OP);
         writeLacOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_WRITE_LAC_OP);
         readLacOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_READ_LAC_OP);
+        getBookieInfoOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.GET_BOOKIE_INFO_OP);
         readTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_READ);
         addTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_ADD);
         writeLacTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_WRITE_LAC);
         readLacTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_READ_LAC);
+        getBookieInfoTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.TIMEOUT_GET_BOOKIE_INFO);
 
         this.pcbcPool = pcbcPool;
 
@@ -674,6 +684,58 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
         }
     }
 
+    public void getBookieInfo(final long requested, GetBookieInfoCallback cb, Object ctx) {
+        final long txnId = getTxnId();
+        final CompletionKey completionKey = new CompletionKey(txnId, OperationType.GET_BOOKIE_INFO);
+        completionObjects.put(completionKey,
+                new GetBookieInfoCompletion(this, getBookieInfoOpLogger, cb, ctx,
+                                   scheduleTimeout(completionKey, getBookieInfoTimeout)));
+
+        // Build the request and calculate the total size to be included in the packet.
+        BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
+                .setVersion(ProtocolVersion.VERSION_THREE)
+                .setOperation(OperationType.GET_BOOKIE_INFO)
+                .setTxnId(txnId);
+
+        GetBookieInfoRequest.Builder getBookieInfoBuilder = GetBookieInfoRequest.newBuilder()
+                .setRequested(requested);
+
+        final Request getBookieInfoRequest = Request.newBuilder()
+                .setHeader(headerBuilder)
+                .setGetBookieInfoRequest(getBookieInfoBuilder)
+                .build();
+
+        final Channel c = channel;
+        if (c == null) {
+            errorOutReadKey(completionKey);
+            return;
+        }
+
+        try{
+            ChannelFuture future = c.write(getBookieInfoRequest);
+            future.addListener(new ChannelFutureListener() {
+                @Override
+                public void operationComplete(ChannelFuture future) throws Exception {
+                    if (future.isSuccess()) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Successfully wrote request {} to {}",
+                                    getBookieInfoRequest, c.getRemoteAddress());
+                        }
+                    } else {
+                        if (!(future.getCause() instanceof ClosedChannelException)) {
+                            LOG.warn("Writing GetBookieInfoRequest(flags={}) to channel {} failed : ",
+                                    new Object[] { requested, c, future.getCause() });
+                        }
+                        errorOutReadKey(completionKey);
+                    }
+                }
+            });
+        } catch(Throwable e) {
+            LOG.warn("Get metadata operation {} failed", getBookieInfoRequest, e);
+            errorOutReadKey(completionKey);
+        }
+    }
+
     /**
      * Disconnects the bookie client. It can be reused.
      */
@@ -848,6 +910,29 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
         });
     }
 
+    void errorOutGetBookieInfoKey(final CompletionKey key) {
+        errorOutGetBookieInfoKey(key, BKException.Code.BookieHandleNotAvailableException);
+    }
+
+    void errorOutGetBookieInfoKey(final CompletionKey key, final int rc) {
+        final GetBookieInfoCompletion getBookieInfoCompletion = (GetBookieInfoCompletion)completionObjects.remove(key);
+        if (null == getBookieInfoCompletion) {
+            return;
+        }
+        executor.submit(new SafeRunnable() {
+            @Override
+            public void safeRun() {
+                String bAddress = "null";
+                Channel c = channel;
+                if (c != null) {
+                    bAddress = c.getRemoteAddress().toString();
+                }
+                LOG.debug("Could not write getBookieInfo request for bookie: {}", new Object[] {bAddress});
+                getBookieInfoCompletion.cb.getBookieInfoComplete(rc, new BookieInfo(), getBookieInfoCompletion.ctx);
+            }
+        });
+    }
+
     /**
      * Errors out pending entries. We call this method from one thread to avoid
      * concurrent executions to QuorumOpMonitor (implements callbacks). It seems
@@ -1009,6 +1094,9 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
                         case READ_LAC:
                             handleReadLacResponse(response.getReadLacResponse(), completionValue);
                             break;
+                        case GET_BOOKIE_INFO:
+                            handleGetBookieInfoResponse(response, completionValue);
+                            break;
                         default:
                             LOG.error("Unexpected response, type:{} received from bookie:{}, ignoring",
                                       type, addr);
@@ -1134,6 +1222,33 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
         rc.cb.readEntryComplete(rcToRet, ledgerId, entryId, buffer.slice(), rc.ctx);
     }
 
+    void handleGetBookieInfoResponse(Response response, CompletionValue completionValue) {
+        // The completion value should always be an instance of a GetBookieInfoCompletion object when we reach here.
+        GetBookieInfoCompletion rc = (GetBookieInfoCompletion)completionValue;
+        GetBookieInfoResponse getBookieInfoResponse = response.getGetBookieInfoResponse();
+
+        long freeDiskSpace = getBookieInfoResponse.hasFreeDiskSpace() ? getBookieInfoResponse.getFreeDiskSpace() : 0L;
+        long totalDiskCapacity = getBookieInfoResponse.hasTotalDiskCapacity() ? getBookieInfoResponse.getTotalDiskCapacity() : 0L;
+
+        StatusCode status = response.getStatus() == StatusCode.EOK ? getBookieInfoResponse.getStatus() : response.getStatus();
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Got response for read metadata request from bookie: {} rc {}", addr, rc);
+        }
+
+        // convert to BKException code because thats what the upper
+        // layers expect. This is UGLY, there should just be one set of
+        // error codes.
+        Integer rcToRet = statusCodeToExceptionCode(status);
+        if (null == rcToRet) {
+            LOG.error("Read metadata failed on bookie:{} with code:{}",
+                      new Object[] { addr, status });
+            rcToRet = BKException.Code.ReadException;
+        }
+        LOG.debug("Response received from bookie info read: freeDiskSpace=" +  freeDiskSpace + " totalDiskSpace:" + totalDiskCapacity);
+        rc.cb.getBookieInfoComplete(rcToRet, new BookieInfo(totalDiskCapacity, freeDiskSpace), rc.ctx);
+    }
+
     /**
      * Boiler-plate wrapper classes follow
      *
@@ -1257,6 +1372,42 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
     }
 
     // visible for testing
+    static class GetBookieInfoCompletion extends CompletionValue {
+        final GetBookieInfoCallback cb;
+
+        public GetBookieInfoCompletion(final PerChannelBookieClient pcbc, GetBookieInfoCallback cb, Object ctx) {
+            this(pcbc, null, cb, ctx, null);
+        }
+
+        public GetBookieInfoCompletion(final PerChannelBookieClient pcbc, final OpStatsLogger getBookieInfoOpLogger,
+                              final GetBookieInfoCallback originalCallback,
+                              final Object originalCtx, final Timeout timeout) {
+            super(originalCtx, 0L, 0L, timeout);
+            final long startTime = MathUtils.nowInNano();
+            this.cb = (null == getBookieInfoOpLogger) ? originalCallback : new GetBookieInfoCallback() {
+                @Override
+                public void getBookieInfoComplete(int rc, BookieInfo bInfo, Object ctx) {
+                    cancelTimeout();
+                    if (getBookieInfoOpLogger != null) {
+                        long latency = MathUtils.elapsedNanos(startTime);
+                        if (rc != BKException.Code.OK) {
+                            getBookieInfoOpLogger.registerFailedEvent(latency, TimeUnit.NANOSECONDS);
+                        } else {
+                            getBookieInfoOpLogger.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS);
+                        }
+                    }
+
+                    if (rc != BKException.Code.OK && !expectedBkOperationErrors.contains(rc)) {
+                        pcbc.recordError();
+                    }
+
+                    originalCallback.getBookieInfoComplete(rc, bInfo, originalCtx);
+                }
+            };
+        }
+    }
+
+    // visible for testing
     static class AddCompletion extends CompletionValue {
         final WriteCallback cb;
 
@@ -1355,9 +1506,12 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
             } else if (OperationType.WRITE_LAC == operationType) {
                 errorOutWriteLacKey(this, BKException.Code.TimeoutException);
                 writeLacTimeoutOpLogger.registerSuccessfulEvent(elapsedTime(), TimeUnit.NANOSECONDS);
-            } else {
+            } else if (OperationType.READ_LAC == operationType) {
                 errorOutReadLacKey(this, BKException.Code.TimeoutException);
                 readLacTimeoutOpLogger.registerSuccessfulEvent(elapsedTime(), TimeUnit.NANOSECONDS);
+            } else {
+                errorOutGetBookieInfoKey(this, BKException.Code.TimeoutException);
+                getBookieInfoTimeoutOpLogger.registerSuccessfulEvent(elapsedTime(), TimeUnit.NANOSECONDS);
             }
 	}
     }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0583175d/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto b/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto
index 9ce9baf..504e231 100644
--- a/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto
+++ b/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto
@@ -59,6 +59,7 @@ enum OperationType {
     AUTH = 5;
     WRITE_LAC = 6;
     READ_LAC = 7;
+    GET_BOOKIE_INFO = 8;
 }
 
 /**
@@ -78,6 +79,7 @@ message Request {
     optional AuthMessage authRequest = 102;
     optional WriteLacRequest writeLacRequest = 103;
     optional ReadLacRequest readLacRequest = 104;
+    optional GetBookieInfoRequest getBookieInfoRequest = 105;
 }
 
 message ReadRequest {
@@ -114,6 +116,15 @@ message ReadLacRequest {
     required int64 ledgerId = 1;
 }
 
+message GetBookieInfoRequest {
+    enum Flags {
+        TOTAL_DISK_CAPACITY = 0x01;
+        FREE_DISK_SPACE = 0x02;
+    }
+    // bitwise OR of Flags
+    optional int64 requested = 1;
+}
+
 message Response {
 
     required BKPacketHeader header = 1;
@@ -126,6 +137,7 @@ message Response {
     optional AuthMessage authResponse = 102;
     optional WriteLacResponse writeLacResponse = 103;
     optional ReadLacResponse readLacResponse = 104;
+    optional GetBookieInfoResponse getBookieInfoResponse = 105;
 }
 
 message ReadResponse {
@@ -157,3 +169,9 @@ message ReadLacResponse {
     optional bytes lacBody = 3; // lac sent by PutLacRequest
     optional bytes lastEntryBody = 4; // Actual last entry on the disk
 }
+
+message GetBookieInfoResponse {
+    required StatusCode status = 1;
+    optional int64 totalDiskCapacity = 2;
+    optional int64 freeDiskSpace = 3;
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0583175d/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperDiskSpaceWeightedLedgerPlacementTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperDiskSpaceWeightedLedgerPlacementTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperDiskSpaceWeightedLedgerPlacementTest.java
new file mode 100644
index 0000000..2f885c0
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperDiskSpaceWeightedLedgerPlacementTest.java
@@ -0,0 +1,452 @@
+package org.apache.bookkeeper.client;
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.util.MathUtils;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests of the main BookKeeper client
+ */
+public class BookKeeperDiskSpaceWeightedLedgerPlacementTest extends BookKeeperClusterTestCase {
+    private final static Logger LOG = LoggerFactory.getLogger(BookKeeperDiskSpaceWeightedLedgerPlacementTest.class);
+
+    public BookKeeperDiskSpaceWeightedLedgerPlacementTest() {
+        super(10);
+    }
+    
+    private BookieServer restartBookie(ServerConfiguration conf, final long initialFreeDiskSpace,
+            final long finallFreeDiskSpace, final int delaySecs) throws Exception {
+        Bookie bookieWithCustomFreeDiskSpace = new Bookie(conf) {
+            long startTime = System.currentTimeMillis();
+            @Override
+            public long getTotalFreeSpace() {
+                if (startTime == 0) {
+                    startTime = System.currentTimeMillis();
+                }
+                if (delaySecs == 0 || ((System.currentTimeMillis()) - startTime < delaySecs*1000)) {
+                    return initialFreeDiskSpace;
+                } else {
+                    // after delaySecs, advertise finallFreeDiskSpace; before that advertise initialFreeDiskSpace
+                    return finallFreeDiskSpace;
+                }
+            }
+        };
+        bsConfs.add(conf);
+        BookieServer server = startBookie(conf, bookieWithCustomFreeDiskSpace);
+        bs.add(server);
+        return server;
+    }
+
+    private BookieServer replaceBookieWithCustomFreeDiskSpaceBookie(int bookieIdx, final long freeDiskSpace)
+            throws Exception {
+        LOG.info("Killing bookie " + bs.get(bookieIdx).getLocalAddress());
+        bs.get(bookieIdx).getLocalAddress();
+        ServerConfiguration conf = killBookie(bookieIdx);
+        return restartBookie(conf, freeDiskSpace, freeDiskSpace, 0);
+    }
+
+    private BookieServer replaceBookieWithCustomFreeDiskSpaceBookie(BookieServer bookie, final long freeDiskSpace)
+            throws Exception {
+        for (int i=0; i < bs.size(); i++) {
+            if (bs.get(i).getLocalAddress().equals(bookie.getLocalAddress())) {
+                return replaceBookieWithCustomFreeDiskSpaceBookie(i, freeDiskSpace);
+            }
+        }
+        return null;
+    }
+
+    private BookieServer replaceBookieWithCustomFreeDiskSpaceBookie(int bookieIdx, long initialFreeDiskSpace,
+             long finalFreeDiskSpace, int delay) throws Exception {
+        LOG.info("Killing bookie " + bs.get(bookieIdx).getLocalAddress());
+        bs.get(bookieIdx).getLocalAddress();
+        ServerConfiguration conf = killBookie(bookieIdx);
+        return restartBookie(conf, initialFreeDiskSpace, finalFreeDiskSpace, delay);
+    }
+
+    /**
+     * Test to show that weight based selection honors the disk weight of bookies
+     */
+    @Test(timeout=60000)
+    public void testDiskSpaceWeightedBookieSelection() throws Exception {
+        long freeDiskSpace=1000000L;
+        int multiple=3;
+        for (int i=0; i < numBookies; i++) {
+            // the first 8 bookies have freeDiskSpace of 1MB; While the remaining 2 have 3MB
+            if (i < numBookies-2) {
+                replaceBookieWithCustomFreeDiskSpaceBookie(0, freeDiskSpace);
+            } else {
+                replaceBookieWithCustomFreeDiskSpaceBookie(0, multiple*freeDiskSpace);
+            }
+        }
+        Map<BookieSocketAddress, Integer> m = new HashMap<BookieSocketAddress, Integer>();
+        for (BookieServer b : bs) {
+            m.put(b.getLocalAddress(), 0);
+        }
+
+        // wait a 100 msecs each for the bookies to come up and the bookieInfo to be retrieved by the client
+        ClientConfiguration conf = new ClientConfiguration()
+            .setZkServers(zkUtil.getZooKeeperConnectString()).setDiskWeightBasedPlacementEnabled(true).
+            setBookieMaxWeightMultipleForWeightBasedPlacement(multiple);
+        Thread.sleep(200);
+        final BookKeeper client = new BookKeeper(conf);
+        Thread.sleep(200);
+        for (int i = 0; i < 2000; i++) {
+            LedgerHandle lh = client.createLedger(3, 3, DigestType.CRC32, "testPasswd".getBytes());
+            for (BookieSocketAddress b : lh.getLedgerMetadata().getEnsemble(0)) {
+                m.put(b, m.get(b)+1);
+            }
+        }
+        client.close();
+        // make sure that bookies with higher weight(the last 2 bookies) are chosen 3X as often as the median;
+        // since the number of ledgers created is small (2000), we allow a range of 2X to 4X instead of the exact 3X
+        for (int i=0; i < numBookies-2; i++) {
+            double ratio1 = (double)m.get(bs.get(numBookies-2).getLocalAddress())/(double)m.get(bs.get(i).getLocalAddress());
+            assertTrue("Weigheted placement is not honored: " + Math.abs(ratio1-multiple), Math.abs(ratio1-multiple) < 1);
+            double ratio2 = (double)m.get(bs.get(numBookies-1).getLocalAddress())/(double)m.get(bs.get(i).getLocalAddress());
+            assertTrue("Weigheted placement is not honored: " + Math.abs(ratio2-multiple), Math.abs(ratio2-multiple) < 1);
+        }
+    }
+
+    /**
+     * Test to show that weight based selection honors the disk weight of bookies and also adapts
+     * when the bookies's weight changes.
+     */
+    @Test(timeout=60000)
+    public void testDiskSpaceWeightedBookieSelectionWithChangingWeights() throws Exception {
+        long freeDiskSpace=1000000L;
+        int multiple=3;
+        for (int i=0; i < numBookies; i++) {
+            // the first 8 bookies have freeDiskSpace of 1MB; While the remaining 2 have 3MB
+            if (i < numBookies-2) {
+                replaceBookieWithCustomFreeDiskSpaceBookie(0, freeDiskSpace);
+            } else {
+                replaceBookieWithCustomFreeDiskSpaceBookie(0, multiple*freeDiskSpace);
+            }
+        }
+        Map<BookieSocketAddress, Integer> m = new HashMap<BookieSocketAddress, Integer>();
+        for (BookieServer b : bs) {
+            m.put(b.getLocalAddress(), 0);
+        }
+
+        // wait a 100 msecs each for the bookies to come up and the bookieInfo to be retrieved by the client
+        ClientConfiguration conf = new ClientConfiguration()
+            .setZkServers(zkUtil.getZooKeeperConnectString()).setDiskWeightBasedPlacementEnabled(true).
+            setBookieMaxWeightMultipleForWeightBasedPlacement(multiple);
+        Thread.sleep(100);
+        final BookKeeper client = new BookKeeper(conf);
+        Thread.sleep(100);
+        for (int i = 0; i < 2000; i++) {
+            LedgerHandle lh = client.createLedger(3, 3, DigestType.CRC32, "testPasswd".getBytes());
+            for (BookieSocketAddress b : lh.getLedgerMetadata().getEnsemble(0)) {
+                m.put(b, m.get(b)+1);
+            }
+        }
+
+        // make sure that bookies with higher weight(the last 2 bookies) are chosen 3X as often as the median;
+        // since the number of ledgers created is small (2000), we allow a range of 2X to 4X instead of the exact 3X
+        for (int i=0; i < numBookies-2; i++) {
+            double ratio1 = (double)m.get(bs.get(numBookies-2).getLocalAddress())/(double)m.get(bs.get(i).getLocalAddress());
+            assertTrue("Weigheted placement is not honored: " + Math.abs(ratio1-multiple), Math.abs(ratio1-multiple) < 1);
+            double ratio2 = (double)m.get(bs.get(numBookies-1).getLocalAddress())/(double)m.get(bs.get(i).getLocalAddress());
+            assertTrue("Weigheted placement is not honored: " + Math.abs(ratio2-multiple), Math.abs(ratio2-multiple) < 1);
+        }
+
+        // Restart the bookies in such a way that the first 2 bookies go from 1MB to 3MB free space and the last
+        // 2 bookies go from 3MB to 1MB
+        BookieServer server1 = bs.get(0);
+        BookieServer server2 = bs.get(1);
+        BookieServer server3 = bs.get(numBookies-2);
+        BookieServer server4 = bs.get(numBookies-1);
+
+        server1 = replaceBookieWithCustomFreeDiskSpaceBookie(server1, multiple*freeDiskSpace);
+        server2 = replaceBookieWithCustomFreeDiskSpaceBookie(server2, multiple*freeDiskSpace);
+        server3 = replaceBookieWithCustomFreeDiskSpaceBookie(server3, freeDiskSpace);
+        server4 = replaceBookieWithCustomFreeDiskSpaceBookie(server4, freeDiskSpace);
+
+        Thread.sleep(100);
+        for (BookieServer b : bs) {
+            m.put(b.getLocalAddress(), 0);
+        }
+        for (int i = 0; i < 2000; i++) {
+            LedgerHandle lh = client.createLedger(3, 3, DigestType.CRC32, "testPasswd".getBytes());
+            for (BookieSocketAddress b : lh.getLedgerMetadata().getEnsemble(0)) {
+                m.put(b, m.get(b)+1);
+            }
+        }
+
+        // make sure that bookies with higher weight(the last 2 bookies) are chosen 3X as often as the median;
+        // since the number of ledgers created is small (2000), we allow a range of 2X to 4X instead of the exact 3X
+        for (int i=0; i < numBookies; i++) {
+            if (server1.getLocalAddress().equals(bs.get(i).getLocalAddress()) ||
+                server2.getLocalAddress().equals(bs.get(i).getLocalAddress())) {
+                continue;
+            }
+            double ratio1 = (double)m.get(server1.getLocalAddress())/(double)m.get(bs.get(i).getLocalAddress());
+            assertTrue("Weigheted placement is not honored: " + Math.abs(ratio1-multiple), Math.abs(ratio1-multiple) < 1);
+            double ratio2 = (double)m.get(server2.getLocalAddress())/(double)m.get(bs.get(i).getLocalAddress());
+            assertTrue("Weigheted placement is not honored: " + Math.abs(ratio2-multiple), Math.abs(ratio2-multiple) < 1);
+        }
+        client.close();
+    }
+
+    /**
+     * Test to show that weight based selection honors the disk weight of bookies and also adapts
+     * when bookies go away permanently.
+     */
+    @Test(timeout=60000)
+    public void testDiskSpaceWeightedBookieSelectionWithBookiesDying() throws Exception {
+        long freeDiskSpace=1000000L;
+        int multiple=3;
+        for (int i=0; i < numBookies; i++) {
+            // the first 8 bookies have freeDiskSpace of 1MB; While the remaining 2 have 1GB
+            if (i < numBookies-2) {
+                replaceBookieWithCustomFreeDiskSpaceBookie(0, freeDiskSpace);
+            } else {
+                replaceBookieWithCustomFreeDiskSpaceBookie(0, multiple*freeDiskSpace);
+            }
+        }
+        Map<BookieSocketAddress, Integer> m = new HashMap<BookieSocketAddress, Integer>();
+        for (BookieServer b : bs) {
+            m.put(b.getLocalAddress(), 0);
+        }
+
+        // wait a couple of 100 msecs each for the bookies to come up and the bookieInfo to be retrieved by the client
+        ClientConfiguration conf = new ClientConfiguration()
+            .setZkServers(zkUtil.getZooKeeperConnectString()).setDiskWeightBasedPlacementEnabled(true).
+            setBookieMaxWeightMultipleForWeightBasedPlacement(multiple);
+        Thread.sleep(100);
+        final BookKeeper client = new BookKeeper(conf);
+        Thread.sleep(100);
+        for (int i = 0; i < 2000; i++) {
+            LedgerHandle lh = client.createLedger(3, 3, DigestType.CRC32, "testPasswd".getBytes());
+            for (BookieSocketAddress b : lh.getLedgerMetadata().getEnsemble(0)) {
+                m.put(b, m.get(b)+1);
+            }
+        }
+
+        // make sure that bookies with higher weight are chosen 3X as often as the median;
+        // since the number of ledgers is small (2000), there may be variation 
+        double ratio1 = (double)m.get(bs.get(numBookies-2).getLocalAddress())/(double)m.get(bs.get(0).getLocalAddress());
+        assertTrue("Weigheted placement is not honored: " + Math.abs(ratio1-multiple), Math.abs(ratio1-multiple) < 1);
+        double ratio2 = (double)m.get(bs.get(numBookies-1).getLocalAddress())/(double)m.get(bs.get(1).getLocalAddress());
+        assertTrue("Weigheted placement is not honored: " + Math.abs(ratio2-multiple), Math.abs(ratio2-multiple) < 1);
+
+        // Bring down the 2 bookies that had higher weight; after this the allocation to all
+        // the remaining bookies should be uniform
+        for (BookieServer b : bs) {
+            m.put(b.getLocalAddress(), 0);
+        }
+        BookieServer server1 = bs.get(numBookies-2);
+        BookieServer server2 = bs.get(numBookies-1);
+        killBookie(numBookies-1);
+        killBookie(numBookies-2);
+
+        // give some time for the cluster to become stable
+        Thread.sleep(100);
+        for (int i = 0; i < 2000; i++) {
+            LedgerHandle lh = client.createLedger(3, 3, DigestType.CRC32, "testPasswd".getBytes());
+            for (BookieSocketAddress b : lh.getLedgerMetadata().getEnsemble(0)) {
+                m.put(b, m.get(b)+1);
+            }
+        }
+
+        // make sure that bookies with higher weight are chosen 3X as often as the median;
+        for (int i=0; i < numBookies-3; i++) {
+            double delta = Math.abs((double)m.get(bs.get(i).getLocalAddress())-(double)m.get(bs.get(i+1).getLocalAddress()));
+            delta = (delta*100)/(double)m.get(bs.get(i+1).getLocalAddress());
+            assertTrue("Weigheted placement is not honored: " + delta, delta <= 30); // the deviation should be less than 30%
+        }
+        // since the following 2 bookies were down, they shouldn't ever be selected
+        assertTrue("Weigheted placement is not honored" + m.get(server1.getLocalAddress()),
+                m.get(server1.getLocalAddress()) == 0);
+        assertTrue("Weigheted placement is not honored" + m.get(server2.getLocalAddress()),
+                m.get(server2.getLocalAddress()) == 0);
+
+        client.close();
+    }
+
+    /**
+     * Test to show that weight based selection honors the disk weight of bookies and also adapts
+     * when bookies are added.
+     */
+    @Test(timeout=60000)
+    public void testDiskSpaceWeightedBookieSelectionWithBookiesBeingAdded() throws Exception {
+        long freeDiskSpace=1000000L;
+        int multiple=3;
+        for (int i=0; i < numBookies; i++) {
+            // all the bookies have freeDiskSpace of 1MB
+            replaceBookieWithCustomFreeDiskSpaceBookie(0, freeDiskSpace);
+        }
+        // let the last two bookies be down initially
+        ServerConfiguration conf1 = killBookie(numBookies-1);
+        ServerConfiguration conf2 = killBookie(numBookies-2);
+        Map<BookieSocketAddress, Integer> m = new HashMap<BookieSocketAddress, Integer>();
+        for (BookieServer b : bs) {
+            m.put(b.getLocalAddress(), 0);
+        }
+
+        // wait a bit for the bookies to come up and the bookieInfo to be retrieved by the client
+        ClientConfiguration conf = new ClientConfiguration()
+            .setZkServers(zkUtil.getZooKeeperConnectString()).setDiskWeightBasedPlacementEnabled(true).
+            setBookieMaxWeightMultipleForWeightBasedPlacement(multiple);
+        Thread.sleep(100);
+        final BookKeeper client = new BookKeeper(conf);
+        Thread.sleep(100);
+        for (int i = 0; i < 2000; i++) {
+            LedgerHandle lh = client.createLedger(3, 3, DigestType.CRC32, "testPasswd".getBytes());
+            for (BookieSocketAddress b : lh.getLedgerMetadata().getEnsemble(0)) {
+                m.put(b, m.get(b)+1);
+            }
+        }
+
+        // make sure that bookies with higher weight are chosen 3X as often as the median;
+        // since the number of ledgers is small (2000), there may be variation
+        for (int i=0; i < numBookies-3; i++) {
+            double delta = Math.abs((double)m.get(bs.get(i).getLocalAddress())-(double)m.get(bs.get(i+1).getLocalAddress()));
+            delta = (delta*100)/(double)m.get(bs.get(i+1).getLocalAddress());
+            assertTrue("Weigheted placement is not honored: " + delta, delta <= 30); // the deviation should be less than 30%
+        }
+
+        // bring up the two dead bookies; they'll also have 3X more free space than the rest of the bookies
+        restartBookie(conf1, multiple*freeDiskSpace, multiple*freeDiskSpace, 0);
+        restartBookie(conf2, multiple*freeDiskSpace, multiple*freeDiskSpace, 0);
+
+        // give some time for the cluster to become stable
+        Thread.sleep(100);
+        for (BookieServer b : bs) {
+            m.put(b.getLocalAddress(), 0);
+        }
+        for (int i = 0; i < 2000; i++) {
+            LedgerHandle lh = client.createLedger(3, 3, DigestType.CRC32, "testPasswd".getBytes());
+            for (BookieSocketAddress b : lh.getLedgerMetadata().getEnsemble(0)) {
+                m.put(b, m.get(b)+1);
+            }
+        }
+
+        // make sure that bookies with higher weight(the last 2 bookies) are chosen 3X as often as the median;
+        // since the number of ledgers created is small (2000), we allow a range of 2X to 4X instead of the exact 3X
+        for (int i=0; i < numBookies-2; i++) {
+            double ratio1 = (double)m.get(bs.get(numBookies-2).getLocalAddress())/(double)m.get(bs.get(i).getLocalAddress());
+            assertTrue("Weigheted placement is not honored: " + Math.abs(ratio1-multiple), Math.abs(ratio1-multiple) < 1);
+            double ratio2 = (double)m.get(bs.get(numBookies-1).getLocalAddress())/(double)m.get(bs.get(i).getLocalAddress());
+            assertTrue("Weigheted placement is not honored: " + Math.abs(ratio2-multiple), Math.abs(ratio2-multiple) < 1);
+        }
+        client.close();
+    }
+
+    /**
+     * Tests that the bookie selection is based on the amount of free disk space a bookie has. Also make sure that
+     * the periodic bookieInfo read is working and causes the new weights to be taken into account.
+     */
+    @Test(timeout=60000)
+    public void testDiskSpaceWeightedBookieSelectionWithPeriodicBookieInfoUpdate() throws Exception {
+        long freeDiskSpace=1000000L;
+        int multiple=3;
+        for (int i=0; i < numBookies; i++) {
+            // the first 8 bookies have freeDiskSpace of 1MB; the remaining 2 will advertise 1MB for
+            // the first 3 seconds but then they'll advertise 3MB after the first 3 seconds
+            if (i < numBookies-2) {
+                replaceBookieWithCustomFreeDiskSpaceBookie(0, freeDiskSpace);
+            } else {
+                replaceBookieWithCustomFreeDiskSpaceBookie(0, freeDiskSpace, multiple*freeDiskSpace, 2);
+            }
+        }
+        Map<BookieSocketAddress, Integer> m = new HashMap<BookieSocketAddress, Integer>();
+        for (BookieServer b : bs) {
+            m.put(b.getLocalAddress(), 0);
+        }
+
+        // the periodic bookieInfo is read once every 7 seconds
+        int updateIntervalSecs = 6;
+        ClientConfiguration conf = new ClientConfiguration()
+            .setZkServers(zkUtil.getZooKeeperConnectString()).setDiskWeightBasedPlacementEnabled(true).
+            setBookieMaxWeightMultipleForWeightBasedPlacement(multiple).
+            setGetBookieInfoIntervalSeconds(updateIntervalSecs, TimeUnit.SECONDS);
+        // wait a bit for the bookies to come up and the bookieInfo to be retrieved by the client
+        Thread.sleep(100);
+        final BookKeeper client = new BookKeeper(conf);
+        Thread.sleep(100);
+        long startMsecs = MathUtils.now();
+        for (int i = 0; i < 2000; i++) {
+            LedgerHandle lh = client.createLedger(3, 3, DigestType.CRC32, "testPasswd".getBytes());
+            for (BookieSocketAddress b : lh.getLedgerMetadata().getEnsemble(0)) {
+                m.put(b, m.get(b)+1);
+            }
+        }
+        long elapsedMsecs = MathUtils.now() - startMsecs;
+
+        // make sure that all the bookies are chosen pretty much uniformly
+        int bookiesToCheck = numBookies-1;
+        if (elapsedMsecs > updateIntervalSecs*1000) {
+            // if this task longer than updateIntervalSecs, the weight for the last 2 bookies will be
+            // higher, so skip checking them
+            bookiesToCheck = numBookies-3;
+        }
+        for (int i=0; i < bookiesToCheck; i++) {
+            double delta = Math.abs((double)m.get(bs.get(i).getLocalAddress())-(double)m.get(bs.get(i+1).getLocalAddress()));
+            delta = (delta*100)/(double)m.get(bs.get(i+1).getLocalAddress());
+            assertTrue("Weigheted placement is not honored: " + delta, delta <= 30); // the deviation should be <30%
+        }
+
+        if (elapsedMsecs < updateIntervalSecs*1000) {
+            // sleep until periodic bookie info retrieval kicks in and it gets the updated
+            // freeDiskSpace for the last 2 bookies
+            Thread.sleep(updateIntervalSecs*1000 - elapsedMsecs);
+        }
+
+        for (BookieServer b : bs) {
+            m.put(b.getLocalAddress(), 0);
+        }
+        for (int i = 0; i < 2000; i++) {
+            LedgerHandle lh = client.createLedger(3, 3, DigestType.CRC32, "testPasswd".getBytes());
+            for (BookieSocketAddress b : lh.getLedgerMetadata().getEnsemble(0)) {
+                m.put(b, m.get(b)+1);
+            }
+        }
+
+        // make sure that bookies with higher weight(the last 2 bookies) are chosen 3X as often as the median;
+        // since the number of ledgers created is small (2000), we allow a range of 2X to 4X instead of the exact 3X
+        for (int i=0; i < numBookies-2; i++) {
+            double ratio1 = (double)m.get(bs.get(numBookies-2).getLocalAddress())/(double)m.get(bs.get(i).getLocalAddress());
+            assertTrue("Weigheted placement is not honored: " + Math.abs(ratio1-multiple), Math.abs(ratio1-multiple) < 1);
+            double ratio2 = (double)m.get(bs.get(numBookies-1).getLocalAddress())/(double)m.get(bs.get(i).getLocalAddress());
+            assertTrue("Weigheted placement is not honored: " + Math.abs(ratio2-multiple), Math.abs(ratio2-multiple) < 1);
+        }
+
+        client.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0583175d/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
new file mode 100644
index 0000000..ed41cb2
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
@@ -0,0 +1,141 @@
+package org.apache.bookkeeper.client;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+
+import org.apache.bookkeeper.client.BKException.Code;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.proto.BookkeeperProtocol;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.util.OrderedSafeExecutor;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This unit test tests timeout of GetBookieInfo request;
+ *
+ */
+public class TestGetBookieInfoTimeout extends BookKeeperClusterTestCase {
+    private final static Logger LOG = LoggerFactory.getLogger(TestGetBookieInfoTimeout.class);
+    DigestType digestType;
+    public ClientSocketChannelFactory channelFactory;
+    public OrderedSafeExecutor executor;
+
+    public TestGetBookieInfoTimeout() {
+        super(10);
+        this.digestType = DigestType.CRC32;
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors
+                .newCachedThreadPool());
+        executor = OrderedSafeExecutor.newBuilder()
+                .name("BKClientOrderedSafeExecutor")
+                .numThreads(2)
+                .build();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        channelFactory.releaseExternalResources();
+        executor.shutdown();
+    }
+
+    @Test(timeout=60000)
+    public void testGetBookieInfoTimeout() throws Exception {
+
+        // connect to the bookies and create a ledger
+        LedgerHandle writelh = bkc.createLedger(3,3,digestType, "testPasswd".getBytes());
+        String tmp = "Foobar";
+        final int numEntries = 10;
+        for (int i = 0; i < numEntries; i++) {
+            writelh.addEntry(tmp.getBytes());
+        }
+
+        // set timeout for getBookieInfo to be 2 secs and cause one of the bookies to go to sleep for 3X that time
+        ClientConfiguration cConf = new ClientConfiguration();
+        cConf.setGetBookieInfoTimeout(2);
+
+        final BookieSocketAddress bookieToSleep = writelh.getLedgerMetadata().getEnsemble(0).get(0);
+        int sleeptime = cConf.getBookieInfoTimeout()*3;
+        CountDownLatch latch = sleepBookie(bookieToSleep, sleeptime);
+        latch.await();
+
+        // try to get bookie info from the sleeping bookie. It should fail with timeout error
+        BookieSocketAddress addr = new BookieSocketAddress(bookieToSleep.getSocketAddress().getHostString(),
+                bookieToSleep.getPort());
+        BookieClient bc = new BookieClient(cConf, channelFactory, executor);
+        long flags = BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE |
+                BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE;
+
+        class CallbackObj {
+            int rc;
+            long requested;
+            @SuppressWarnings("unused")
+            long freeDiskSpace, totalDiskCapacity;
+            CountDownLatch latch = new CountDownLatch(1);
+            CallbackObj(long requested) {
+                this.requested = requested;
+                this.rc = 0;
+                this.freeDiskSpace = 0L;
+                this.totalDiskCapacity = 0L;
+            }
+        };
+        CallbackObj obj = new CallbackObj(flags);
+        bc.getBookieInfo(addr, flags, new GetBookieInfoCallback() {
+            @Override
+            public void getBookieInfoComplete(int rc, BookieInfo bInfo, Object ctx) {
+                CallbackObj obj = (CallbackObj)ctx;
+                obj.rc=rc;
+                if (rc == Code.OK) {
+                    if ((obj.requested & BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE) != 0) {
+                        obj.freeDiskSpace = bInfo.getFreeDiskSpace();
+                    }
+                    if ((obj.requested & BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE) != 0) {
+                        obj.totalDiskCapacity = bInfo.getTotalDiskSpace();
+                    }
+                }
+                obj.latch.countDown();
+            }
+
+        }, obj);
+        obj.latch.await();
+        LOG.debug("Return code: " + obj.rc);
+        assertTrue("GetBookieInfo failed with unexpected error code: " + obj.rc, obj.rc == Code.TimeoutException);
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0583175d/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
index bef6bc2..6739ea4 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
@@ -33,6 +34,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import junit.framework.TestCase;
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
+import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.net.DNSToSwitchMapping;
@@ -420,6 +422,252 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
     }
 
+    @Test(timeout = 60000)
+    public void testWeightedPlacementAndReplaceBookieWithEnoughBookiesInSameRack() throws Exception {
+        BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
+        BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
+        BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
+        BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
+        // update dns mapping
+        StaticDNSResolver.addNodeToRack(addr1.getSocketAddress().getAddress().getHostAddress(),
+                NetworkTopology.DEFAULT_RACK);
+        StaticDNSResolver.addNodeToRack(addr2.getSocketAddress().getAddress().getHostAddress(), NetworkTopology.DEFAULT_REGION + "/r2");
+        StaticDNSResolver.addNodeToRack(addr3.getSocketAddress().getAddress().getHostAddress(), NetworkTopology.DEFAULT_REGION + "/r2");
+        StaticDNSResolver.addNodeToRack(addr4.getSocketAddress().getAddress().getHostAddress(), NetworkTopology.DEFAULT_REGION + "/r2");
+        // Update cluster
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        addrs.add(addr1);
+        addrs.add(addr2);
+        addrs.add(addr3);
+        addrs.add(addr4);
+
+        int multiple = 10;
+        conf.setDiskWeightBasedPlacementEnabled(true);
+        conf.setBookieMaxWeightMultipleForWeightBasedPlacement(-1); // no max cap on weight
+        repp.initialize(conf, Optional.<DNSToSwitchMapping>absent(), timer, DISABLE_ALL, null);
+
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+        Map<BookieSocketAddress, BookieInfo> bookieInfoMap = new HashMap<BookieSocketAddress, BookieInfo>();
+        bookieInfoMap.put(addr1, new BookieInfo(100L, 100L));
+        bookieInfoMap.put(addr2, new BookieInfo(100L, 100L));
+        bookieInfoMap.put(addr3, new BookieInfo(100L, 100L));
+        bookieInfoMap.put(addr4, new BookieInfo(multiple*100L, multiple*100L));
+        repp.updateBookieInfo(bookieInfoMap);
+
+        Map<BookieSocketAddress, Long> selectionCounts = new HashMap<BookieSocketAddress, Long>();
+        selectionCounts.put(addr3, 0L);
+        selectionCounts.put(addr4, 0L);
+        int numTries = 50000;
+        BookieSocketAddress replacedBookie;
+        for (int i = 0; i < numTries; i++) {
+            // replace node under r2
+            replacedBookie = repp.replaceBookie(1, 1, 1, null, new HashSet<BookieSocketAddress>(), addr2, new HashSet<BookieSocketAddress>());
+            assertTrue(addr3.equals(replacedBookie) || addr4.equals(replacedBookie));
+            selectionCounts.put(replacedBookie, selectionCounts.get(replacedBookie)+1);
+        }
+        double observedMultiple = ((double)selectionCounts.get(addr4)/(double)selectionCounts.get(addr3));
+        assertTrue("Weights not being honored " + observedMultiple, Math.abs(observedMultiple-multiple) < 1);
+    }
+
+    @Test(timeout = 60000)
+    public void testWeightedPlacementAndReplaceBookieWithoutEnoughBookiesInSameRack() throws Exception {
+        BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
+        BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
+        BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
+        BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
+        // update dns mapping
+        StaticDNSResolver.reset();
+        StaticDNSResolver.addNodeToRack(addr1.getSocketAddress().getAddress().getHostAddress(), NetworkTopology.DEFAULT_RACK);
+        StaticDNSResolver.addNodeToRack(addr2.getSocketAddress().getAddress().getHostAddress(), NetworkTopology.DEFAULT_REGION + "/r2");
+        StaticDNSResolver.addNodeToRack(addr3.getSocketAddress().getAddress().getHostAddress(), NetworkTopology.DEFAULT_REGION + "/r3");
+        StaticDNSResolver.addNodeToRack(addr4.getSocketAddress().getAddress().getHostAddress(), NetworkTopology.DEFAULT_REGION + "/r4");
+        // Update cluster
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        addrs.add(addr1);
+        addrs.add(addr2);
+        addrs.add(addr3);
+        addrs.add(addr4);
+
+        int multiple = 10, maxMultiple = 4;
+        conf.setDiskWeightBasedPlacementEnabled(true);
+        conf.setBookieMaxWeightMultipleForWeightBasedPlacement(maxMultiple);
+        repp.initialize(conf, Optional.<DNSToSwitchMapping>absent(), timer, DISABLE_ALL, null);
+
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+        Map<BookieSocketAddress, BookieInfo> bookieInfoMap = new HashMap<BookieSocketAddress, BookieInfo>();
+        bookieInfoMap.put(addr1, new BookieInfo(100L, 100L));
+        bookieInfoMap.put(addr2, new BookieInfo(100L, 100L));
+        bookieInfoMap.put(addr3, new BookieInfo(200L, 200L));
+        bookieInfoMap.put(addr4, new BookieInfo(multiple*100L, multiple*100L));
+        repp.updateBookieInfo(bookieInfoMap);
+
+        Map<BookieSocketAddress, Long> selectionCounts = new HashMap<BookieSocketAddress, Long>();
+        selectionCounts.put(addr1, 0L);
+        selectionCounts.put(addr2, 0L);
+        selectionCounts.put(addr3, 0L);
+        selectionCounts.put(addr4, 0L);
+        int numTries = 50000;
+        BookieSocketAddress replacedBookie;
+        for (int i = 0; i < numTries; i++) {
+            // addr2 is on /r2 and this is the only one on this rack. So the replacement
+            // will come from other racks. However, the weight should be honored in such
+            // selections as well
+            replacedBookie = repp.replaceBookie(1, 1, 1, null, new HashSet<BookieSocketAddress>(), addr2, new HashSet<BookieSocketAddress>());
+            assertTrue(addr1.equals(replacedBookie) || addr3.equals(replacedBookie) || addr4.equals(replacedBookie));
+            selectionCounts.put(replacedBookie, selectionCounts.get(replacedBookie)+1);
+        }
+
+        double medianWeight = 150;
+        double medianSelectionCounts = (double)(medianWeight/bookieInfoMap.get(addr1).getWeight())*selectionCounts.get(addr1);
+        double observedMultiple1 = ((double)selectionCounts.get(addr4)/(double)medianSelectionCounts);
+        double observedMultiple2 = ((double)selectionCounts.get(addr4)/(double)selectionCounts.get(addr3));
+        LOG.info("oM1 " + observedMultiple1 + " oM2 " + observedMultiple2);
+        assertTrue("Weights not being honored expected " + maxMultiple + " observed " + observedMultiple1,
+                Math.abs(observedMultiple1-maxMultiple) < 1);
+        double expected = (medianWeight*maxMultiple)/bookieInfoMap.get(addr3).getWeight();// expected multiple for addr3
+        assertTrue("Weights not being honored expected " + expected + " observed " + observedMultiple2,
+                Math.abs(observedMultiple2-expected) < 1);
+    }
+
+    @Test(timeout = 60000)
+    public void testWeightedPlacementAndNewEnsembleWithEnoughBookiesInSameRack() throws Exception {
+        BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
+        BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
+        BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
+        BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
+        BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.5", 3181);
+        BookieSocketAddress addr6 = new BookieSocketAddress("127.0.0.6", 3181);
+        BookieSocketAddress addr7 = new BookieSocketAddress("127.0.0.7", 3181);
+        BookieSocketAddress addr8 = new BookieSocketAddress("127.0.0.8", 3181);
+        BookieSocketAddress addr9 = new BookieSocketAddress("127.0.0.9", 3181);
+
+        // update dns mapping
+        StaticDNSResolver.addNodeToRack(addr1.getSocketAddress().getAddress().getHostAddress(), NetworkTopology.DEFAULT_RACK);
+        StaticDNSResolver.addNodeToRack(addr2.getSocketAddress().getAddress().getHostAddress(), NetworkTopology.DEFAULT_REGION + "/r2");
+        StaticDNSResolver.addNodeToRack(addr3.getSocketAddress().getAddress().getHostAddress(), NetworkTopology.DEFAULT_REGION + "/r2");
+        StaticDNSResolver.addNodeToRack(addr4.getSocketAddress().getAddress().getHostAddress(), NetworkTopology.DEFAULT_REGION + "/r2");
+        StaticDNSResolver.addNodeToRack(addr5.getSocketAddress().getAddress().getHostAddress(), NetworkTopology.DEFAULT_REGION + "/r2");
+        StaticDNSResolver.addNodeToRack(addr6.getSocketAddress().getAddress().getHostAddress(), NetworkTopology.DEFAULT_REGION + "/r3");
+        StaticDNSResolver.addNodeToRack(addr7.getSocketAddress().getAddress().getHostAddress(), NetworkTopology.DEFAULT_REGION + "/r3");
+        StaticDNSResolver.addNodeToRack(addr8.getSocketAddress().getAddress().getHostAddress(), NetworkTopology.DEFAULT_REGION + "/r3");
+        StaticDNSResolver.addNodeToRack(addr9.getSocketAddress().getAddress().getHostAddress(), NetworkTopology.DEFAULT_REGION + "/r3");
+
+        // Update cluster
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        addrs.add(addr1);
+        addrs.add(addr2);
+        addrs.add(addr3);
+        addrs.add(addr4);
+        addrs.add(addr5);
+        addrs.add(addr6);
+        addrs.add(addr7);
+        addrs.add(addr8);
+        addrs.add(addr9);
+
+        int maxMultiple = 4;
+        conf.setDiskWeightBasedPlacementEnabled(true);
+        conf.setBookieMaxWeightMultipleForWeightBasedPlacement(maxMultiple);
+        repp.initialize(conf, Optional.<DNSToSwitchMapping>absent(), timer, DISABLE_ALL, null);
+
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+        Map<BookieSocketAddress, BookieInfo> bookieInfoMap = new HashMap<BookieSocketAddress, BookieInfo>();
+        bookieInfoMap.put(addr1, new BookieInfo(100L, 100L));
+        bookieInfoMap.put(addr2, new BookieInfo(100L, 100L));
+        bookieInfoMap.put(addr3, new BookieInfo(100L, 100L));
+        bookieInfoMap.put(addr4, new BookieInfo(100L, 100L));
+        bookieInfoMap.put(addr5, new BookieInfo(1000L, 1000L));
+        bookieInfoMap.put(addr6, new BookieInfo(100L, 100L));
+        bookieInfoMap.put(addr7, new BookieInfo(100L, 100L));
+        bookieInfoMap.put(addr8, new BookieInfo(100L, 100L));
+        bookieInfoMap.put(addr9, new BookieInfo(1000L, 1000L));
+
+        repp.updateBookieInfo(bookieInfoMap);
+
+        Map<BookieSocketAddress, Long> selectionCounts = new HashMap<BookieSocketAddress, Long>();
+        for (BookieSocketAddress b : addrs) {
+            selectionCounts.put(b, 0L);
+        }
+        int numTries = 10000;
+
+        Set<BookieSocketAddress> excludeList = new HashSet<BookieSocketAddress>();
+        ArrayList<BookieSocketAddress> ensemble;
+        for (int i = 0; i < numTries; i++) {
+            // addr2 is on /r2 and this is the only one on this rack. So the replacement
+            // will come from other racks. However, the weight should be honored in such
+            // selections as well
+            ensemble = repp.newEnsemble(3, 2, 2, null, excludeList);
+            assertTrue("Rackaware selection not happening " + getNumCoveredWriteQuorums(ensemble, 2), getNumCoveredWriteQuorums(ensemble, 2) >= 2);
+            for (BookieSocketAddress b : ensemble) {
+                selectionCounts.put(b, selectionCounts.get(b)+1);
+            }
+        }
+
+        // the median weight used is 100 since addr2 and addr6 have the same weight, we use their
+        // selection counts as the same as median
+        double observedMultiple1 = ((double)selectionCounts.get(addr5)/(double)selectionCounts.get(addr2));
+        double observedMultiple2 = ((double)selectionCounts.get(addr9)/(double)selectionCounts.get(addr6));
+        assertTrue("Weights not being honored expected 2 observed " + observedMultiple1,
+                Math.abs(observedMultiple1-maxMultiple) < 0.5);
+        assertTrue("Weights not being honored expected 4 observed " + observedMultiple2,
+                Math.abs(observedMultiple2-maxMultiple) < 0.5);
+    }
+
+    @Test(timeout = 60000)
+    public void testWeightedPlacementAndNewEnsembleWithoutEnoughBookies() throws Exception {
+        BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
+        BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
+        BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
+        BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
+        BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.5", 3181);
+
+        // update dns mapping
+        StaticDNSResolver.addNodeToRack(addr1.getSocketAddress().getAddress().getHostAddress(), NetworkTopology.DEFAULT_RACK);
+        StaticDNSResolver.addNodeToRack(addr2.getSocketAddress().getAddress().getHostAddress(), NetworkTopology.DEFAULT_REGION + "/r2");
+        StaticDNSResolver.addNodeToRack(addr3.getSocketAddress().getAddress().getHostAddress(), NetworkTopology.DEFAULT_REGION + "/r2");
+        StaticDNSResolver.addNodeToRack(addr4.getSocketAddress().getAddress().getHostAddress(), NetworkTopology.DEFAULT_REGION + "/r3");
+        StaticDNSResolver.addNodeToRack(addr5.getSocketAddress().getAddress().getHostAddress(), NetworkTopology.DEFAULT_REGION + "/r3");
+        // Update cluster
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        addrs.add(addr1);
+        addrs.add(addr2);
+        addrs.add(addr3);
+        addrs.add(addr4);
+        addrs.add(addr5);
+
+        int maxMultiple = 4;
+        conf.setDiskWeightBasedPlacementEnabled(true);
+        conf.setBookieMaxWeightMultipleForWeightBasedPlacement(maxMultiple);
+        repp.initialize(conf, Optional.<DNSToSwitchMapping>absent(), timer, DISABLE_ALL, null);
+
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+        Map<BookieSocketAddress, BookieInfo> bookieInfoMap = new HashMap<BookieSocketAddress, BookieInfo>();
+        bookieInfoMap.put(addr1, new BookieInfo(100L, 100L));
+        bookieInfoMap.put(addr2, new BookieInfo(100L, 100L));
+        bookieInfoMap.put(addr3, new BookieInfo(1000L, 1000L));
+        bookieInfoMap.put(addr4, new BookieInfo(100L, 100L));
+        bookieInfoMap.put(addr5, new BookieInfo(1000L, 1000L));
+
+        repp.updateBookieInfo(bookieInfoMap);
+
+        ArrayList<BookieSocketAddress> ensemble = new ArrayList<BookieSocketAddress>();
+        Set<BookieSocketAddress> excludeList = new HashSet<BookieSocketAddress>();
+        try {
+            excludeList.add(addr1);
+            excludeList.add(addr2);
+            excludeList.add(addr3);
+            excludeList.add(addr4);
+            ensemble = repp.newEnsemble(3, 2, 2, null, excludeList);
+            fail("Should throw BKNotEnoughBookiesException when there is not enough bookies" + ensemble);
+        } catch (BKNotEnoughBookiesException e) {
+            // this is expected
+        }
+        try {
+            ensemble = repp.newEnsemble(1, 1, 1, null, excludeList);
+        } catch (BKNotEnoughBookiesException e) {
+            fail("Should not throw BKNotEnoughBookiesException when there are enough bookies for the ensemble");
+        }
+    }
+
     private int getNumCoveredWriteQuorums(ArrayList<BookieSocketAddress> ensemble, int writeQuorumSize)
             throws Exception {
         int ensembleSize = ensemble.size();

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0583175d/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWeightedRandomSelection.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWeightedRandomSelection.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWeightedRandomSelection.java
new file mode 100644
index 0000000..037ac80
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWeightedRandomSelection.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.client;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.bookkeeper.client.WeightedRandomSelection.WeightedObject;
+import org.apache.commons.configuration.CompositeConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestWeightedRandomSelection {
+
+    static final Logger LOG = LoggerFactory.getLogger(TestWeightedRandomSelection.class);
+
+    static class TestObj implements WeightedObject {
+        long val;
+        TestObj(long value) {
+            this.val = value;
+        }
+        @Override
+        public long getWeight() {
+            return val;
+        }
+    }
+
+    WeightedRandomSelection<String> wRS;
+    Configuration conf = new CompositeConfiguration();
+    int multiplier = 3;
+
+    @Before
+    public void setUp() throws Exception {
+        wRS = new WeightedRandomSelection<String>();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+    }
+
+    @Test(timeout = 60000)
+    public void testSelectionWithEqualWeights() throws Exception {
+        Map<String, WeightedObject> map = new HashMap<String, WeightedObject>();
+
+        Long val=100L;
+        int numKeys = 50,  totalTries = 1000000;
+        Map<String, Integer> randomSelection = new HashMap<String, Integer>();
+        for (Integer i=0; i < numKeys; i++) {
+            map.put(i.toString(), new TestObj(val));
+            randomSelection.put(i.toString(), 0);
+        }
+
+        wRS.updateMap(map);
+        for (int i = 0; i < totalTries; i++) {
+            String key = wRS.getNextRandom();
+            randomSelection.put(key, randomSelection.get(key)+1);
+        }
+
+        // there should be uniform distribution
+        double expectedPct = ((double)1/(double)numKeys)*100;
+        for (Map.Entry<String, Integer> e : randomSelection.entrySet()) {
+            double actualPct = ((double)e.getValue()/(double)totalTries)*100;
+            double delta = (Math.abs(expectedPct-actualPct)/expectedPct)*100;
+            System.out.println("Key:" + e.getKey() + " Value:" + e.getValue() + " Expected: " + expectedPct + " Actual: " + actualPct);
+            // should be within 5% of expected
+            assertTrue("Not doing uniform selection when weights are equal", delta < 5);
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testSelectionWithAllZeroWeights() throws Exception {
+        Map<String, WeightedObject> map = new HashMap<String, WeightedObject>();
+
+        int numKeys = 50,  totalTries = 1000000;
+        Map<String, Integer> randomSelection = new HashMap<String, Integer>();
+        for (Integer i=0; i < numKeys; i++) {
+            map.put(i.toString(), new TestObj(0L));
+            randomSelection.put(i.toString(), 0);
+        }
+
+        wRS.updateMap(map);
+        for (int i = 0; i < totalTries; i++) {
+            String key = wRS.getNextRandom();
+            randomSelection.put(key, randomSelection.get(key)+1);
+        }
+
+        // when all the values are zeros, there should be uniform distribution
+        double expectedPct = ((double)1/(double)numKeys)*100;
+        for (Map.Entry<String, Integer> e : randomSelection.entrySet()) {
+            double actualPct = ((double)e.getValue()/(double)totalTries)*100;
+            double delta = (Math.abs(expectedPct-actualPct)/expectedPct)*100;
+            System.out.println("Key:" + e.getKey() + " Value:" + e.getValue() + " Expected: " + expectedPct + " Actual: " + actualPct);
+            // should be within 5% of expected
+            assertTrue("Not doing uniform selection when weights are equal", delta < 5);
+        }
+    }
+
+    void verifyResult(Map<String, WeightedObject> map, Map<String, Integer> randomSelection, int multiplier,
+            long minWeight, long medianWeight, long totalWeight, int totalTries) {
+        List<Integer> values = new ArrayList<Integer>(randomSelection.values());
+        Collections.sort(values);
+        double medianObserved, medianObservedWeight, medianExpectedWeight;
+        int mid = values.size()/2;
+        if ((values.size() % 2) == 1) {
+            medianObserved = values.get(mid);
+        } else {
+            medianObserved = (double)(values.get(mid-1) + values.get(mid))/2;
+        }
+
+        medianObservedWeight = (double)medianObserved/(double)totalTries;
+        medianExpectedWeight = (double)medianWeight/totalWeight;
+
+        for (Map.Entry<String, Integer> e : randomSelection.entrySet()) {
+            double observed = (((double)e.getValue()/(double)totalTries));
+
+            double expected;
+            if (map.get(e.getKey()).getWeight() == 0) {
+                // if the value is 0 for any key, we make it equal to the first non zero value
+                expected = (double)minWeight/(double)totalWeight;
+            } else {
+                expected = (double)map.get(e.getKey()).getWeight()/(double)totalWeight;
+            }
+            if (multiplier > 0 && expected > multiplier*medianExpectedWeight) {
+                expected = multiplier*medianExpectedWeight;
+            }
+            // We can't compare these weights because they are derived from different
+            // values. But if we express them as a multiple of the min in each, then
+            // they should be comparable
+            double expectedMultiple = expected/medianExpectedWeight;
+            double observedMultiple = observed/medianObservedWeight;
+            double delta = (Math.abs(expectedMultiple-observedMultiple)/expectedMultiple)*100;
+            System.out.println("Key:" + e.getKey() + " Value:" + e.getValue() 
+                    + " Expected " + expectedMultiple + " actual " + observedMultiple + " delta " + delta + "%");
+
+            // the observed should be within 5% of expected
+            assertTrue("Not doing uniform selection when weights are equal", delta < 5);
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testSelectionWithSomeZeroWeights() throws Exception {
+        Map<String, WeightedObject> map = new HashMap<String, WeightedObject>();
+        Map<String, Integer> randomSelection = new HashMap<String, Integer>();
+        int numKeys = 50;
+        multiplier=3;
+        long val=0L, total=0L, minWeight = 100L, medianWeight=minWeight;
+        wRS.setMaxProbabilityMultiplier(multiplier);
+        for (Integer i=0; i < numKeys; i++) {
+            if (i < numKeys/3) {
+                val = 0L;
+            } else if (i < 2*(numKeys/3)){
+                val = minWeight;
+            } else {
+                val = 2*minWeight;
+            }
+            total += val;
+            map.put(i.toString(), new TestObj(val));
+            randomSelection.put(i.toString(), 0);
+        }
+
+        wRS.updateMap(map);
+        int totalTries = 10000000;
+        for (int i = 0; i < totalTries; i++) {
+            String key = wRS.getNextRandom();
+            randomSelection.put(key, randomSelection.get(key)+1);
+        }
+        verifyResult(map, randomSelection, multiplier, minWeight, medianWeight, total, totalTries);
+    }
+
+    @Test(timeout = 60000)
+    public void testSelectionWithUnequalWeights() throws Exception {
+        Map<String, WeightedObject> map = new HashMap<String, WeightedObject>();
+        Map<String, Integer> randomSelection = new HashMap<String, Integer>();
+        int numKeys = 50;
+        multiplier=4;
+        long val=0L, total=0L, minWeight=100L, medianWeight=2*minWeight;
+        wRS.setMaxProbabilityMultiplier(multiplier);
+        for (Integer i=0; i < numKeys; i++) {
+            if (i < numKeys/3) {
+                val = minWeight;
+            } else if (i < 2*(numKeys/3)){
+                val = 2*minWeight;
+            } else {
+                val = 10*minWeight;
+            }
+            total += val;
+            map.put(i.toString(), new TestObj(val));
+            randomSelection.put(i.toString(), 0);
+        }
+
+        wRS.updateMap(map);
+        int totalTries = 10000000;
+        for (int i = 0; i < totalTries; i++) {
+            String key = wRS.getNextRandom();
+            randomSelection.put(key, randomSelection.get(key)+1);
+        }
+        verifyResult(map, randomSelection, multiplier, minWeight, medianWeight, total, totalTries);
+    }
+
+    @Test(timeout = 60000)
+    public void testSelectionWithHotNode() throws Exception {
+        Map<String, WeightedObject> map = new HashMap<String, WeightedObject>();
+        Map<String, Integer> randomSelection = new HashMap<String, Integer>();
+
+        multiplier=3; // no max
+        int numKeys = 50;
+        long total=0L, minWeight = 100L, val = minWeight, medianWeight=minWeight;
+        wRS.setMaxProbabilityMultiplier(multiplier);
+        for (Integer i=0; i < numKeys; i++) {
+            if (i == numKeys-1) {
+                // last one has 10X more weight than the rest put together
+                val=10*(numKeys-1)*100L;
+            }
+            total += val;
+            map.put(i.toString(), new TestObj(val));
+            randomSelection.put(i.toString(), 0);
+        }
+
+        wRS.updateMap(map);
+        int totalTries = 10000000;
+        for (int i = 0; i < totalTries; i++) {
+            String key = wRS.getNextRandom();
+            randomSelection.put(key, randomSelection.get(key)+1);
+        }
+        verifyResult(map, randomSelection, multiplier, minWeight, medianWeight, total, totalTries);
+    }
+
+    @Test(timeout = 60000)
+    public void testSelectionWithHotNodeWithLimit() throws Exception {
+        Map<String, WeightedObject> map = new HashMap<String, WeightedObject>();
+        Map<String, Integer> randomSelection = new HashMap<String, Integer>();
+
+        multiplier=3; // limit the max load on hot node to be 3X
+        int numKeys = 50;
+        long total=0L, minWeight = 100L, val = minWeight, medianWeight=minWeight;
+        wRS.setMaxProbabilityMultiplier(multiplier);
+        for (Integer i=0; i < numKeys; i++) {
+            if (i == numKeys-1) {
+                // last one has 10X more weight than the rest put together
+                val=10*(numKeys-1)*100L;
+            }
+            total += val;
+            map.put(i.toString(), new TestObj(val));
+            randomSelection.put(i.toString(), 0);
+        }
+
+        wRS.updateMap(map);
+        int totalTries = 10000000;
+        for (int i = 0; i < totalTries; i++) {
+            String key = wRS.getNextRandom();
+            randomSelection.put(key, randomSelection.get(key)+1);
+        }
+        verifyResult(map, randomSelection, multiplier, minWeight, medianWeight, total, totalTries);
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0583175d/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
index 7760827..0698780 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
@@ -22,11 +22,15 @@ package org.apache.bookkeeper.test;
  */
 
 import java.io.File;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 
 import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BKException.Code;
+import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -34,6 +38,8 @@ import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.proto.BookieClient;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
+import org.apache.bookkeeper.proto.BookkeeperProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
@@ -250,4 +256,48 @@ public class BookieClientTest {
             assertEquals(BKException.Code.NoSuchLedgerExistsException, arc.rc);
         }
     }
+
+    @Test(timeout=60000)
+    public void testGetBookieInfo() throws IOException, InterruptedException {
+        BookieSocketAddress addr = new BookieSocketAddress("127.0.0.1", port);
+        BookieClient bc = new BookieClient(new ClientConfiguration(), channelFactory, executor);
+        long flags = BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE |
+                BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE;
+
+        class CallbackObj {
+            int rc;
+            long requested;
+            long freeDiskSpace, totalDiskCapacity;
+            CountDownLatch latch = new CountDownLatch(1);
+            CallbackObj(long requested) {
+                this.requested = requested;
+                this.rc = 0;
+                this.freeDiskSpace = 0L;
+                this.totalDiskCapacity = 0L;
+            }
+        };
+        CallbackObj obj = new CallbackObj(flags);
+        bc.getBookieInfo(addr, flags, new GetBookieInfoCallback() {
+            @Override
+            public void getBookieInfoComplete(int rc, BookieInfo bInfo, Object ctx) {
+                CallbackObj obj = (CallbackObj)ctx;
+                obj.rc=rc;
+                if (rc == Code.OK) {
+                    if ((obj.requested & BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE) != 0) {
+                        obj.freeDiskSpace = bInfo.getFreeDiskSpace();
+                    }
+                    if ((obj.requested & BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE) != 0) {
+                        obj.totalDiskCapacity = bInfo.getTotalDiskSpace();
+                    }
+                }
+                obj.latch.countDown();
+            }
+
+        }, obj);
+        obj.latch.await();
+        System.out.println("Return code: " + obj.rc + "FreeDiskSpace: " + obj.freeDiskSpace + " TotalCapacity: " + obj.totalDiskCapacity);
+        assertTrue("GetBookieInfo failed with error " + obj.rc, obj.rc == Code.OK);
+        assertTrue("GetBookieInfo failed with error " + obj.rc, obj.freeDiskSpace <= obj.totalDiskCapacity);
+        assertTrue("GetBookieInfo failed with error " + obj.rc, obj.totalDiskCapacity > 0);
+    }
 }


Mime
View raw message