bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From iv...@apache.org
Subject bookkeeper git commit: BOOKKEEPER-634: Provide admin tool to rename bookie identifier in ledger metadata (rakeshr via ivank)
Date Tue, 13 Jan 2015 14:40:32 GMT
Repository: bookkeeper
Updated Branches:
  refs/heads/branch-4.3 09714f820 -> d17c46fc7


BOOKKEEPER-634: Provide admin tool to rename bookie identifier in ledger metadata (rakeshr
via ivank)


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/d17c46fc
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/d17c46fc
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/d17c46fc

Branch: refs/heads/branch-4.3
Commit: d17c46fc79da130e3738baaa74af9c99f1e234f2
Parents: 09714f8
Author: Ivan Kelly <ivank@apache.org>
Authored: Tue Jan 13 13:06:18 2015 +0100
Committer: Ivan Kelly <ivank@apache.org>
Committed: Tue Jan 13 15:40:16 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../apache/bookkeeper/bookie/BookieShell.java   | 144 ++++++++-
 .../bookkeeper/client/UpdateLedgerOp.java       | 246 +++++++++++++++
 .../bookkeeper/client/UpdateLedgerCmdTest.java  | 133 ++++++++
 .../bookkeeper/client/UpdateLedgerOpTest.java   | 305 +++++++++++++++++++
 .../test/BookKeeperClusterTestCase.java         |   2 +-
 6 files changed, 824 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/d17c46fc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 59b9e9e..1e37013 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -28,6 +28,8 @@ Release 4.3.1 - unreleased
 
       BOOKKEEPER-811: Recovery tool doesn't remove cookie after recovering one bookie (Charles
Xie via sijie)
 
+      BOOKKEEPER-634: Provide admin tool to rename bookie identifier in ledger metadata (rakeshr
via ivank)
+
       bookkeeper-client:
 
         BOOKKEEPER-810: Allow to configure TCP connect timeout (Charles Xie via sijie)

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/d17c46fc/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index a1e4639..f1b57df 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -41,6 +41,7 @@ import org.apache.bookkeeper.client.BookKeeperAdmin;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.LedgerMetadata;
+import org.apache.bookkeeper.client.UpdateLedgerOp;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
@@ -50,6 +51,7 @@ import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.util.EntryFormatter;
+import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.Tool;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.bookkeeper.versioning.Version;
@@ -101,6 +103,7 @@ public class BookieShell implements Tool {
     static final String CMD_AUTORECOVERY = "autorecovery";
     static final String CMD_LISTBOOKIES = "listbookies";
     static final String CMD_UPDATECOOKIE = "updatecookie";
+    static final String CMD_UPDATELEDGER = "updateledgers";
     static final String CMD_HELP = "help";
 
     final ServerConfiguration bkConf = new ServerConfiguration();
@@ -1163,13 +1166,6 @@ public class BookieShell implements Tool {
             return 0;
         }
 
-        private boolean getOptionalValue(String optValue, String optName) {
-            if (StringUtils.equals(optValue, optName)) {
-                return true;
-            }
-            return false;
-        }
-
         private boolean verifyCookie(Cookie oldCookie, File dir) throws IOException {
             try {
                 Cookie cookie = Cookie.readFromDirectory(dir);
@@ -1181,6 +1177,124 @@ public class BookieShell implements Tool {
         }
     }
 
+    /**
+     * Update ledger command
+     */
+    class UpdateLedgerCmd extends MyCommand {
+        private final Options opts = new Options();
+
+        UpdateLedgerCmd() {
+            super(CMD_UPDATELEDGER);
+            opts.addOption("b", "bookieId", true, "Bookie Id");
+            opts.addOption("s", "updatespersec", true, "Number of ledgers updating per second
(default: 5 per sec)");
+            opts.addOption("l", "limit", true, "Maximum number of ledgers to update (default:
no limit)");
+            opts.addOption("v", "verbose", true, "Print status of the ledger updation (default:
false)");
+            opts.addOption("p", "printprogress", true,
+                    "Print messages on every configured seconds if verbose turned on (default:
10 secs)");
+        }
+
+        @Override
+        Options getOptions() {
+            return opts;
+        }
+
+        @Override
+        String getDescription() {
+            return "Update bookie id in ledgers (this may take a long time)";
+        }
+
+        @Override
+        String getUsage() {
+            return "updateledger -bookieId <hostname|ip> [-updatespersec N] [-limit
N] [-verbose true/false] [-printprogress N]";
+        }
+
+        @Override
+        int runCmd(CommandLine cmdLine) throws Exception {
+            final String bookieId = cmdLine.getOptionValue("bookieId");
+            if (StringUtils.isBlank(bookieId)) {
+                LOG.error("Invalid argument list!");
+                this.printUsage();
+                return -1;
+            }
+            if (!StringUtils.equals(bookieId, "hostname") && !StringUtils.equals(bookieId,
"ip")) {
+                LOG.error("Invalid option value {} for bookieId, expected hostname/ip", bookieId);
+                this.printUsage();
+                return -1;
+            }
+            boolean useHostName = getOptionalValue(bookieId, "hostname");
+            if (!bkConf.getUseHostNameAsBookieID() && useHostName) {
+                LOG.error("Expects configuration useHostNameAsBookieID=true as the option
value passed is 'hostname'");
+                return -1;
+            } else if (bkConf.getUseHostNameAsBookieID() && !useHostName) {
+                LOG.error("Expects configuration useHostNameAsBookieID=false as the option
value passed is 'ip'");
+                return -1;
+            }
+            final int rate = getOptionIntValue(cmdLine, "updatespersec", 5);
+            if (rate <= 0) {
+                LOG.error("Invalid updatespersec {}, should be > 0", rate);
+                return -1;
+            }
+            final int limit = getOptionIntValue(cmdLine, "limit", Integer.MIN_VALUE);
+            if (limit <= 0 && limit != Integer.MIN_VALUE) {
+                LOG.error("Invalid limit {}, should be > 0", limit);
+                return -1;
+            }
+            final boolean verbose = getOptionBooleanValue(cmdLine, "verbose", false);
+            final long printprogress;
+            if (!verbose) {
+                if (cmdLine.hasOption("printprogress")) {
+                    LOG.warn("Ignoring option 'printprogress', this is applicable when 'verbose'
is true");
+                }
+                printprogress = Integer.MIN_VALUE;
+            } else {
+                // defaulting to 10 seconds
+                printprogress = getOptionLongValue(cmdLine, "printprogress", 10);
+            }
+            final ClientConfiguration conf = new ClientConfiguration();
+            conf.addConfiguration(bkConf);
+            final BookKeeper bk = new BookKeeper(conf);
+            final BookKeeperAdmin admin = new BookKeeperAdmin(conf);
+            final UpdateLedgerOp updateLedgerOp = new UpdateLedgerOp(bk, admin);
+            final ServerConfiguration serverConf = new ServerConfiguration(bkConf);
+            final BookieSocketAddress newBookieId = Bookie.getBookieAddress(serverConf);
+            serverConf.setUseHostNameAsBookieID(!useHostName);
+            final BookieSocketAddress oldBookieId = Bookie.getBookieAddress(serverConf);
+
+            UpdateLedgerNotifier progressable = new UpdateLedgerNotifier() {
+                long lastReport = System.nanoTime();
+
+                @Override
+                public void progress(long updated, long issued) {
+                    if (printprogress <= 0) {
+                        return; // disabled
+                    }
+                    if (TimeUnit.MILLISECONDS.toSeconds(MathUtils.elapsedMSec(lastReport))
>= printprogress) {
+                        LOG.info("Number of ledgers issued={}, updated={}", issued, updated);
+                        lastReport = MathUtils.nowInNano();
+                    }
+                }
+            };
+            try {
+                updateLedgerOp.updateBookieIdInLedgers(oldBookieId, newBookieId, rate, limit,
progressable);
+            } catch (BKException e) {
+                LOG.error("Failed to update ledger metadata", e);
+                return -1;
+            } catch (IOException e) {
+                LOG.error("Failed to update ledger metadata", e);
+                return -1;
+            }
+            return 0;
+        }
+
+    }
+
+    /**
+     * A facility for reporting update ledger progress.
+     */
+    public interface UpdateLedgerNotifier {
+        void progress(long updated, long issued);
+    }
+
     final Map<String, MyCommand> commands = new HashMap<String, MyCommand>();
     {
         commands.put(CMD_METAFORMAT, new MetaFormatCmd());
@@ -1198,6 +1312,7 @@ public class BookieShell implements Tool {
         commands.put(CMD_AUTORECOVERY, new AutoRecoveryCmd());
         commands.put(CMD_LISTBOOKIES, new ListBookiesCmd());
         commands.put(CMD_UPDATECOOKIE, new UpdateCookieCmd());
+        commands.put(CMD_UPDATELEDGER, new UpdateLedgerCmd());
         commands.put(CMD_HELP, new HelpCmd());
     }
 
@@ -1578,4 +1693,19 @@ public class BookieShell implements Tool {
         }
         return defaultVal;
     }
+
+    private static boolean getOptionBooleanValue(CommandLine cmdLine, String option, boolean
defaultVal) {
+        if (cmdLine.hasOption(option)) {
+            String val = cmdLine.getOptionValue(option);
+            return Boolean.parseBoolean(val);
+        }
+        return defaultVal;
+    }
+
+    private static boolean getOptionalValue(String optValue, String optName) {
+        if (StringUtils.equals(optValue, optName)) {
+            return true;
+        }
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/d17c46fc/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/UpdateLedgerOp.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/UpdateLedgerOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/UpdateLedgerOp.java
new file mode 100644
index 0000000..9d9f331
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/UpdateLedgerOp.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.client;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.bookkeeper.bookie.BookieShell.UpdateLedgerNotifier;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.AbstractFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.RateLimiter;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Encapsulates updating the ledger metadata operation
+ */
+public class UpdateLedgerOp {
+
+    private final static Logger LOG = LoggerFactory.getLogger(UpdateLedgerOp.class);
+    private final BookKeeper bkc;
+    private final BookKeeperAdmin admin;
+
+    public UpdateLedgerOp(final BookKeeper bkc, final BookKeeperAdmin admin) {
+        this.bkc = bkc;
+        this.admin = admin;
+    }
+
+    /**
+     * Update the bookie id present in the ledger metadata.
+     *
+     * @param oldBookieId
+     *            current bookie id
+     * @param newBookieId
+     *            new bookie id
+     * @param rate
+     *            number of ledgers updating per second (default 5 per sec)
+     * @param limit
+     *            maximum number of ledgers to update (default: no limit). Stop
+     *            update if reaching limit
+     * @param progressable
+     *            report progress of the ledger updates
+     * @throws IOException
+     *             if there is an error when updating bookie id in ledger
+     *             metadata
+     * @throws InterruptedException
+     *             interrupted exception when update ledger meta
+     */
+    public void updateBookieIdInLedgers(final BookieSocketAddress oldBookieId, final BookieSocketAddress
newBookieId,
+            final int rate, final int limit, final UpdateLedgerNotifier progressable) throws
BKException, IOException {
+
+        final ThreadFactoryBuilder tfb = new ThreadFactoryBuilder().setNameFormat("UpdateLedgerThread").setDaemon(true);
+        final ExecutorService executor = Executors.newSingleThreadExecutor(tfb.build());
+        final AtomicInteger issuedLedgerCnt = new AtomicInteger();
+        final AtomicInteger updatedLedgerCnt = new AtomicInteger();
+        final Future<?> updateBookieCb = executor.submit(new Runnable() {
+
+            @Override
+            public void run() {
+                updateLedgers(oldBookieId, newBookieId, rate, limit, progressable);
+            }
+
+            private void updateLedgers(final BookieSocketAddress oldBookieId, final BookieSocketAddress
newBookieId,
+                    final int rate, final int limit, final UpdateLedgerNotifier progressable)
{
+                try {
+                    final AtomicBoolean stop = new AtomicBoolean(false);
+                    final Set<Long> outstandings = Collections.newSetFromMap(new ConcurrentHashMap<Long,
Boolean>());
+                    final RateLimiter throttler = RateLimiter.create(rate);
+                    final Iterator<Long> ledgerItr = admin.listLedgers().iterator();
+                    final CountDownLatch syncObj = new CountDownLatch(1);
+
+                    // iterate through all the ledgers
+                    while (ledgerItr.hasNext() && !stop.get()) {
+                        // throttler to control updates per second
+                        throttler.acquire();
+
+                        final Long lId = ledgerItr.next();
+                        final ReadLedgerMetadataCb readCb = new ReadLedgerMetadataCb(bkc,
lId, oldBookieId, newBookieId);
+                        outstandings.add(lId);
+
+                        FutureCallback<Void> updateLedgerCb = new UpdateLedgerCb(lId,
stop, issuedLedgerCnt,
+                                updatedLedgerCnt, outstandings, syncObj, progressable);
+                        Futures.addCallback(readCb, updateLedgerCb);
+
+                        issuedLedgerCnt.incrementAndGet();
+                        if (limit != Integer.MIN_VALUE && issuedLedgerCnt.get() >=
limit || !ledgerItr.hasNext()) {
+                            stop.set(true);
+                        }
+                        bkc.getLedgerManager().readLedgerMetadata(lId, readCb);
+                    }
+                    // waiting till all the issued ledgers are finished
+                    syncObj.await();
+                } catch (IOException ioe) {
+                    LOG.error("Exception while updating ledger", ioe);
+                    throw new RuntimeException("Exception while updating ledger", ioe.getCause());
+                } catch (InterruptedException ie) {
+                    LOG.error("Exception while updating ledger metadata", ie);
+                    Thread.currentThread().interrupt();
+                    throw new RuntimeException("Exception while updating ledger", ie.getCause());
+                }
+            }
+        });
+        try {
+            // Wait to finish the issued ledgers.
+            updateBookieCb.get();
+        } catch (ExecutionException ee) {
+            throw new IOException("Exception while updating ledger", ee);
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            throw new IOException("Exception while updating ledger", ie);
+        } finally {
+            executor.shutdown();
+        }
+    }
+
+    private final static class UpdateLedgerCb implements FutureCallback<Void> {
+        final long ledgerId;
+        final AtomicBoolean stop;
+        final AtomicInteger issuedLedgerCnt;
+        final AtomicInteger updatedLedgerCnt;
+        final Set<Long> outstandings;
+        final CountDownLatch syncObj;
+        final UpdateLedgerNotifier progressable;
+
+        public UpdateLedgerCb(long ledgerId, AtomicBoolean stop, AtomicInteger issuedLedgerCnt,
+                AtomicInteger updatedLedgerCnt, Set<Long> outstandings, CountDownLatch
syncObj,
+                UpdateLedgerNotifier progressable) {
+            this.ledgerId = ledgerId;
+            this.stop = stop;
+            this.issuedLedgerCnt = issuedLedgerCnt;
+            this.updatedLedgerCnt = updatedLedgerCnt;
+            this.outstandings = outstandings;
+            this.syncObj = syncObj;
+            this.progressable = progressable;
+        }
+
+        @Override
+        public void onFailure(Throwable th) {
+            LOG.error("Error updating ledger {}", ledgerId, th);
+            stop.set(true);
+            finishUpdateLedger();
+        }
+
+        @Override
+        public void onSuccess(Void obj) {
+            updatedLedgerCnt.incrementAndGet();
+            // may print progress
+            progressable.progress(updatedLedgerCnt.get(), issuedLedgerCnt.get());
+            finishUpdateLedger();
+        }
+
+        private void finishUpdateLedger() {
+            outstandings.remove(ledgerId);
+            if (outstandings.isEmpty() && stop.get()) {
+                LOG.info("Total number of ledgers issued={} updated={}", issuedLedgerCnt.get(),
updatedLedgerCnt.get());
+                syncObj.countDown();
+            }
+        }
+    }
+
+    private final static class ReadLedgerMetadataCb extends AbstractFuture<Void> implements
+            GenericCallback<LedgerMetadata> {
+        final BookKeeper bkc;
+        final Long ledgerId;
+        final BookieSocketAddress curBookieAddr;
+        final BookieSocketAddress toBookieAddr;
+
+        public ReadLedgerMetadataCb(BookKeeper bkc, Long ledgerId, BookieSocketAddress curBookieAddr,
+                BookieSocketAddress toBookieAddr) {
+            this.bkc = bkc;
+            this.ledgerId = ledgerId;
+            this.curBookieAddr = curBookieAddr;
+            this.toBookieAddr = toBookieAddr;
+        }
+
+        @Override
+        public void operationComplete(int rc, LedgerMetadata metadata) {
+            if (BKException.Code.NoSuchLedgerExistsException == rc) {
+                set(null);
+                return; // this is OK
+            } else if (BKException.Code.OK != rc) {
+                // open ledger failed.
+                LOG.error("Get ledger metadata {} failed. Error code {}", ledgerId, rc);
+                setException(BKException.create(rc));
+                return;
+            }
+            boolean updateEnsemble = false;
+            for (ArrayList<BookieSocketAddress> ensembles : metadata.getEnsembles().values())
{
+                int index = ensembles.indexOf(curBookieAddr);
+                if (-1 != index) {
+                    ensembles.set(index, toBookieAddr);
+                    updateEnsemble = true;
+                }
+            }
+            if (!updateEnsemble) {
+                set(null);
+                return; // ledger doesn't contains the given curBookieId
+            }
+            final GenericCallback<Void> writeCb = new GenericCallback<Void>()
{
+                @Override
+                public void operationComplete(int rc, Void result) {
+                    if (rc != BKException.Code.OK) {
+                        // metadata update failed
+                        LOG.error("Ledger {} metadata update failed. Error code {}", ledgerId,
rc);
+                        setException(BKException.create(rc));
+                        return;
+                    }
+                    set(null);
+                }
+            };
+            bkc.getLedgerManager().writeLedgerMetadata(ledgerId, metadata, writeCb);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/d17c46fc/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/UpdateLedgerCmdTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/UpdateLedgerCmdTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/UpdateLedgerCmdTest.java
new file mode 100644
index 0000000..1d56a56
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/UpdateLedgerCmdTest.java
@@ -0,0 +1,133 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import junit.framework.Assert;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieShell;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.zookeeper.KeeperException;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UpdateLedgerCmdTest extends BookKeeperClusterTestCase {
+
+    private final static Logger LOG = LoggerFactory.getLogger(UpdateLedgerCmdTest.class);
+    private DigestType digestType = DigestType.CRC32;
+    private static final String PASSWORD = "testPasswd";
+
+    public UpdateLedgerCmdTest() {
+        super(3);
+        baseConf.setAllowLoopback(true);
+        baseConf.setGcWaitTime(100000);
+    }
+
+    /**
+     * updateledgers to hostname
+     */
+    @Test(timeout = 120000)
+    public void testUpdateLedgersToHostname() throws Exception {
+        BookKeeper bk = new BookKeeper(baseClientConf, zkc);
+        LOG.info("Create ledger and add entries to it");
+        List<LedgerHandle> ledgers = new ArrayList<LedgerHandle>();
+        LedgerHandle lh1 = createLedgerWithEntries(bk, 0);
+        ledgers.add(lh1);
+        for (int i = 1; i < 40; i++) {
+            ledgers.add(createLedgerWithEntries(bk, 0));
+        }
+
+        String[] argv = new String[] { "updateledgers", "-b", "hostname", "-v", "true", "-p",
"2" };
+        final ServerConfiguration conf = bsConfs.get(0);
+        conf.setUseHostNameAsBookieID(true);
+        BookieSocketAddress toBookieId = Bookie.getBookieAddress(conf);
+        BookieSocketAddress toBookieAddr = new BookieSocketAddress(toBookieId.getHostname()
+ ":"
+                + conf.getBookiePort());
+
+        updateLedgerCmd(argv, 0, conf);
+
+        int updatedLedgersCount = getUpdatedLedgersCount(bk, ledgers, toBookieAddr);
+        Assert.assertEquals("Failed to update the ledger metadata to use bookie host name",
40, updatedLedgersCount);
+    }
+
+    private void updateLedgerCmd(String[] argv, int exitCode, ServerConfiguration conf) throws
KeeperException,
+            InterruptedException, IOException, UnknownHostException, Exception {
+        LOG.info("Perform updateledgers command");
+        BookieShell bkShell = new BookieShell();
+        bkShell.setConf(conf);
+
+        Assert.assertEquals("Failed to return exit code!", exitCode, bkShell.run(argv));
+    }
+
+    private int getUpdatedLedgersCount(BookKeeper bk, List<LedgerHandle> ledgers, BookieSocketAddress
toBookieAddr)
+            throws InterruptedException, BKException {
+        ArrayList<BookieSocketAddress> ensemble;
+        int updatedLedgersCount = 0;
+        for (LedgerHandle lh : ledgers) {
+            // ledger#close() would hit BadVersion exception as rename
+            // increments cversion. But LedgerMetadata#isConflictWith()
+            // gracefully handles this conflicts.
+            lh.close();
+            LedgerHandle openLedger = bk.openLedger(lh.getId(), digestType, PASSWORD.getBytes());
+            ensemble = openLedger.getLedgerMetadata().getEnsemble(0);
+            if (ensemble.contains(toBookieAddr)) {
+                updatedLedgersCount++;
+            }
+        }
+        return updatedLedgersCount;
+    }
+
+    private LedgerHandle createLedgerWithEntries(BookKeeper bk, int numOfEntries) throws
Exception {
+        LedgerHandle lh = bk.createLedger(3, 3, digestType, PASSWORD.getBytes());
+        final AtomicInteger rc = new AtomicInteger(BKException.Code.OK);
+        final CountDownLatch latch = new CountDownLatch(numOfEntries);
+
+        final AddCallback cb = new AddCallback() {
+            public void addComplete(int rccb, LedgerHandle lh, long entryId, Object ctx)
{
+                rc.compareAndSet(BKException.Code.OK, rccb);
+                latch.countDown();
+            }
+        };
+        for (int i = 0; i < numOfEntries; i++) {
+            lh.asyncAddEntry(("foobar" + i).getBytes(), cb, null);
+        }
+        if (!latch.await(30, TimeUnit.SECONDS)) {
+            throw new Exception("Entries took too long to add");
+        }
+        if (rc.get() != BKException.Code.OK) {
+            throw BKException.create(rc.get());
+        }
+        return lh;
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/d17c46fc/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/UpdateLedgerOpTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/UpdateLedgerOpTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/UpdateLedgerOpTest.java
new file mode 100644
index 0000000..2ea094f
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/UpdateLedgerOpTest.java
@@ -0,0 +1,305 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieShell.UpdateLedgerNotifier;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.util.MathUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UpdateLedgerOpTest extends BookKeeperClusterTestCase {
+    private static final Logger LOG = LoggerFactory.getLogger(UpdateLedgerOpTest.class);
+    private DigestType digestType = DigestType.CRC32;
+    private static final String PASSWORD = "testPasswd";
+    private static final int printprogress = 5;
+
+    public UpdateLedgerOpTest() {
+        super(3);
+        baseConf.setAllowLoopback(true);
+        baseConf.setGcWaitTime(100000);
+    }
+
+    UpdateLedgerNotifier progressable = new UpdateLedgerNotifier() {
+        long lastReport = System.nanoTime();
+
+        @Override
+        public void progress(long updated, long issued) {
+            if (TimeUnit.MILLISECONDS.toSeconds(MathUtils.elapsedMSec(lastReport)) >=
printprogress) {
+                LOG.info("Number of ledgers issued={}, updated={}", issued, updated);
+                lastReport = MathUtils.nowInNano();
+            }
+        }
+    };
+
+    /**
+     * Tests verifies update bookie id when there are many ledgers.
+     */
+    @Test(timeout = 120000)
+    public void testManyLedgers() throws Exception {
+        BookKeeper bk = new BookKeeper(baseClientConf, zkc);
+        BookKeeperAdmin bkadmin = new BookKeeperAdmin(bk);
+
+        LOG.info("Create ledger and add entries to it");
+        List<LedgerHandle> ledgers = new ArrayList<LedgerHandle>();
+        LedgerHandle lh1 = createLedgerWithEntries(bk, 0);
+        ledgers.add(lh1);
+        for (int i = 0; i < 99; i++) {
+            ledgers.add(createLedgerWithEntries(bk, 0));
+        }
+
+        ArrayList<BookieSocketAddress> ensemble = lh1.getLedgerMetadata().getEnsemble(0);
+
+        BookieSocketAddress curBookieAddr = ensemble.get(0);
+        baseConf.setUseHostNameAsBookieID(true);
+        BookieSocketAddress curBookieId = Bookie.getBookieAddress(baseConf);
+        BookieSocketAddress toBookieAddr = new BookieSocketAddress(curBookieId.getHostname()
+ ":"
+                + curBookieAddr.getPort());
+        UpdateLedgerOp updateLedgerOp = new UpdateLedgerOp(bk, bkadmin);
+        updateLedgerOp.updateBookieIdInLedgers(curBookieAddr, toBookieAddr, 5, Integer.MIN_VALUE,
progressable);
+
+        for (LedgerHandle lh : ledgers) {
+            // ledger#close() would hit BadVersion exception as rename
+            // increments cversion. But LedgerMetadata#isConflictWith()
+            // gracefully handles this conflicts.
+            lh.close();
+            LedgerHandle openLedger = bk.openLedger(lh.getId(), digestType, PASSWORD.getBytes());
+            ensemble = openLedger.getLedgerMetadata().getEnsemble(0);
+            Assert.assertTrue("Failed to update the ledger metadata to use bookie host name",
+                    ensemble.contains(toBookieAddr));
+            Assert.assertFalse("Failed to update the ledger metadata to use bookie host name",
+                    ensemble.contains(curBookieAddr));
+        }
+    }
+
+    /**
+     * Tests verifies with limit value lesser than the total number of ledgers.
+     */
+    @Test(timeout = 120000)
+    public void testLimitLessThanTotalLedgers() throws Exception {
+        BookKeeper bk = new BookKeeper(baseClientConf, zkc);
+        BookKeeperAdmin bkadmin = new BookKeeperAdmin(bk);
+
+        LOG.info("Create ledger and add entries to it");
+        List<LedgerHandle> ledgers = new ArrayList<LedgerHandle>();
+        LedgerHandle lh1 = createLedgerWithEntries(bk, 0);
+        ledgers.add(lh1);
+        for (int i = 1; i < 10; i++) {
+            ledgers.add(createLedgerWithEntries(bk, 0));
+        }
+
+        ArrayList<BookieSocketAddress> ensemble = lh1.getLedgerMetadata().getEnsemble(0);
+
+        BookieSocketAddress curBookieAddr = ensemble.get(0);
+        baseConf.setUseHostNameAsBookieID(true);
+        BookieSocketAddress toBookieId = Bookie.getBookieAddress(baseConf);
+        BookieSocketAddress toBookieAddr = new BookieSocketAddress(toBookieId.getHostname()
+ ":"
+                + curBookieAddr.getPort());
+        UpdateLedgerOp updateLedgerOp = new UpdateLedgerOp(bk, bkadmin);
+        updateLedgerOp.updateBookieIdInLedgers(curBookieAddr, toBookieAddr, 7, 4, progressable);
+        int updatedLedgersCount = getUpdatedLedgersCount(bk, ledgers, toBookieAddr);
+        Assert.assertEquals("Failed to update the ledger metadata to use bookie host name",
4, updatedLedgersCount);
+
+        // next execution
+        updateLedgerOp.updateBookieIdInLedgers(curBookieAddr, toBookieAddr, 2, 10, progressable);
+        updatedLedgersCount = getUpdatedLedgersCount(bk, ledgers, toBookieAddr);
+        Assert.assertEquals("Failed to update the ledger metadata to use bookie host name",
10, updatedLedgersCount);
+
+        // no ledgers
+        updateLedgerOp.updateBookieIdInLedgers(curBookieAddr, toBookieAddr, 3, 20, progressable);
+        updatedLedgersCount = getUpdatedLedgersCount(bk, ledgers, toBookieAddr);
+        Assert.assertEquals("Failed to update the ledger metadata to use bookie host name",
10, updatedLedgersCount);
+
+        // no ledgers
+        updateLedgerOp.updateBookieIdInLedgers(curBookieAddr, toBookieAddr, 3, Integer.MIN_VALUE,
progressable);
+        updatedLedgersCount = getUpdatedLedgersCount(bk, ledgers, toBookieAddr);
+        Assert.assertEquals("Failed to update the ledger metadata to use bookie host name",
10, updatedLedgersCount);
+    }
+
+    /**
+     * Tests verifies the ensemble reformation after updating the bookie id in
+     * the existing ensemble
+     */
+    @Test(timeout = 120000)
+    public void testChangeEnsembleAfterRenaming() throws Exception {
+
+        BookKeeper bk = new BookKeeper(baseClientConf, zkc);
+        BookKeeperAdmin bkadmin = new BookKeeperAdmin(bk);
+
+        LOG.info("Create ledger and add entries to it");
+        LedgerHandle lh = createLedgerWithEntries(bk, 100);
+
+        BookieServer bookieServer = bs.get(0);
+        ArrayList<BookieSocketAddress> ensemble = lh.getLedgerMetadata().getEnsemble(0);
+        BookieSocketAddress curBookieAddr = null;
+        for (BookieSocketAddress bookieSocketAddress : ensemble) {
+            if (bookieServer.getLocalAddress().equals(bookieSocketAddress)) {
+                curBookieAddr = bookieSocketAddress;
+            }
+        }
+        Assert.assertNotNull("Couldn't find the bookie in ledger metadata!", curBookieAddr);
+        baseConf.setUseHostNameAsBookieID(true);
+        BookieSocketAddress toBookieId = Bookie.getBookieAddress(baseConf);
+        BookieSocketAddress toBookieAddr = new BookieSocketAddress(toBookieId.getHostname()
+ ":"
+                + curBookieAddr.getPort());
+        UpdateLedgerOp updateLedgerOp = new UpdateLedgerOp(bk, bkadmin);
+        updateLedgerOp.updateBookieIdInLedgers(curBookieAddr, toBookieAddr, 5, 100, progressable);
+
+        bookieServer.shutdown();
+
+        ServerConfiguration serverConf1 = newServerConfiguration();
+        bsConfs.add(serverConf1);
+        bs.add(startBookie(serverConf1));
+
+        // ledger#asyncAddEntry() would hit BadVersion exception as rename incr
+        // cversion. But LedgerMetadata#isConflictWith() gracefully handles
+        // this conflicts.
+        final CountDownLatch latch = new CountDownLatch(1);
+        final AtomicInteger rc = new AtomicInteger(BKException.Code.OK);
+        lh.asyncAddEntry("foobar".getBytes(), new AddCallback() {
+            @Override
+            public void addComplete(int rccb, LedgerHandle lh, long entryId, Object ctx)
{
+                rc.compareAndSet(BKException.Code.OK, rccb);
+                latch.countDown();
+            }
+        }, null);
+        if (!latch.await(30, TimeUnit.SECONDS)) {
+            throw new Exception("Entries took too long to add");
+        }
+        if (rc.get() != BKException.Code.OK) {
+            throw BKException.create(rc.get());
+        }
+        lh.close();
+        LedgerHandle openLedger = bk.openLedger(lh.getId(), digestType, PASSWORD.getBytes());
+        final LedgerMetadata ledgerMetadata = openLedger.getLedgerMetadata();
+        Assert.assertEquals("Failed to reform ensemble!", 2, ledgerMetadata.getEnsembles().size());
+        ensemble = ledgerMetadata.getEnsemble(0);
+        Assert.assertTrue("Failed to update the ledger metadata to use bookie host name",
+                ensemble.contains(toBookieAddr));
+    }
+
+    /**
+     * Tests verifies simultaneous flow between adding entries and rename of
+     * bookie id
+     */
+    @Test(timeout = 120000)
+    public void testRenameWhenAddEntryInProgress() throws Exception {
+        final BookKeeper bk = new BookKeeper(baseClientConf, zkc);
+        BookKeeperAdmin bkadmin = new BookKeeperAdmin(bk);
+
+        LOG.info("Create ledger and add entries to it");
+        final int numOfEntries = 5000;
+        final CountDownLatch latch = new CountDownLatch(numOfEntries);
+        final AtomicInteger rc = new AtomicInteger(BKException.Code.OK);
+        final LedgerHandle lh = createLedgerWithEntries(bk, 1);
+        latch.countDown();
+        Thread th = new Thread() {
+            public void run() {
+                final AddCallback cb = new AddCallback() {
+                    public void addComplete(int rccb, LedgerHandle lh, long entryId, Object
ctx) {
+                        rc.compareAndSet(BKException.Code.OK, rccb);
+                        if (entryId % 100 == 0) {
+                            LOG.info("Added entries till entryId:{}", entryId);
+                        }
+                        latch.countDown();
+                    }
+                };
+                for (int i = 1; i < numOfEntries; i++) {
+                    lh.asyncAddEntry(("foobar" + i).getBytes(), cb, null);
+                }
+
+            };
+        };
+        th.start();
+        ArrayList<BookieSocketAddress> ensemble = lh.getLedgerMetadata().getEnsemble(0);
+        BookieSocketAddress curBookieAddr = ensemble.get(0);
+        BookieSocketAddress toBookieAddr = new BookieSocketAddress("localhost:" + curBookieAddr.getPort());
+        UpdateLedgerOp updateLedgerOp = new UpdateLedgerOp(bk, bkadmin);
+        updateLedgerOp.updateBookieIdInLedgers(curBookieAddr, toBookieAddr, 5, 100, progressable);
+
+        if (!latch.await(120, TimeUnit.SECONDS)) {
+            throw new Exception("Entries took too long to add");
+        }
+        if (rc.get() != BKException.Code.OK) {
+            throw BKException.create(rc.get());
+        }
+        lh.close();
+        LedgerHandle openLedger = bk.openLedger(lh.getId(), digestType, PASSWORD.getBytes());
+        ensemble = openLedger.getLedgerMetadata().getEnsemble(0);
+        Assert.assertTrue("Failed to update the ledger metadata to use bookie host name",
+                ensemble.contains(toBookieAddr));
+    }
+
+    private int getUpdatedLedgersCount(BookKeeper bk, List<LedgerHandle> ledgers, BookieSocketAddress
toBookieAddr)
+            throws InterruptedException, BKException {
+        ArrayList<BookieSocketAddress> ensemble;
+        int updatedLedgersCount = 0;
+        for (LedgerHandle lh : ledgers) {
+            // ledger#close() would hit BadVersion exception as rename
+            // increments cversion. But LedgerMetadata#isConflictWith()
+            // gracefully handles this conflicts.
+            lh.close();
+            LedgerHandle openLedger = bk.openLedger(lh.getId(), digestType, PASSWORD.getBytes());
+            ensemble = openLedger.getLedgerMetadata().getEnsemble(0);
+            if (ensemble.contains(toBookieAddr)) {
+                updatedLedgersCount++;
+            }
+        }
+        return updatedLedgersCount;
+    }
+
+    private LedgerHandle createLedgerWithEntries(BookKeeper bk, int numOfEntries) throws
Exception {
+        LedgerHandle lh = bk.createLedger(3, 3, digestType, PASSWORD.getBytes());
+        final AtomicInteger rc = new AtomicInteger(BKException.Code.OK);
+        final CountDownLatch latch = new CountDownLatch(numOfEntries);
+
+        final AddCallback cb = new AddCallback() {
+            public void addComplete(int rccb, LedgerHandle lh, long entryId, Object ctx)
{
+                rc.compareAndSet(BKException.Code.OK, rccb);
+                latch.countDown();
+            }
+        };
+        for (int i = 0; i < numOfEntries; i++) {
+            lh.asyncAddEntry(("foobar" + i).getBytes(), cb, null);
+        }
+        if (!latch.await(30, TimeUnit.SECONDS)) {
+            throw new Exception("Entries took too long to add");
+        }
+        if (rc.get() != BKException.Code.OK) {
+            throw BKException.create(rc.get());
+        }
+        return lh;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/d17c46fc/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index fbb79b0..0160640 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -82,6 +82,7 @@ public abstract class BookKeeperClusterTestCase {
 
     public BookKeeperClusterTestCase(int numBookies) {
         this.numBookies = numBookies;
+        baseConf.setAllowLoopback(true);
     }
 
     @Before
@@ -199,7 +200,6 @@ public abstract class BookKeeperClusterTestCase {
         conf.setBookiePort(port);
         conf.setZkServers(zkServers);
         conf.setJournalDirName(journalDir.getPath());
-        conf.setAllowLoopback(true);
         String[] ledgerDirNames = new String[ledgerDirs.length];
         for (int i=0; i<ledgerDirs.length; i++) {
             ledgerDirNames[i] = ledgerDirs[i].getPath();


Mime
View raw message