zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From iv...@apache.org
Subject svn commit: r1371255 - in /zookeeper/bookkeeper/trunk: ./ hedwig-server/src/main/java/org/apache/hedwig/server/common/ hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ hedwig-server/src/test/java/org/apache/hedwig/server/persistence/
Date Thu, 09 Aug 2012 15:26:17 GMT
Author: ivank
Date: Thu Aug  9 15:26:17 2012
New Revision: 1371255

URL: http://svn.apache.org/viewvc?rev=1371255&view=rev
Log:
BOOKKEEPER-191: Hub server should change ledger to write, so consumed messages have chance
to be garbage collected. (sijie via ivank)

Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookkeeperPersistenceManagerWhiteBox.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1371255&r1=1371254&r2=1371255&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Thu Aug  9 15:26:17 2012
@@ -66,6 +66,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-259: Create a topic manager using versioned write for leader election
(sijie via ivank)
 
+        BOOKKEEPER-191: Hub server should change ledger to write, so consumed messages have
chance to be garbage collected. (sijie via ivank)
+
     IMPROVEMENTS:
 
       bookkeeper-server:

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java?rev=1371255&r1=1371254&r2=1371255&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
(original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
Thu Aug  9 15:26:17 2012
@@ -61,6 +61,7 @@ public class ServerConfiguration extends
     protected final static String BK_ENSEMBLE_SIZE = "bk_ensemble_size";
     protected final static String BK_QUORUM_SIZE = "bk_quorum_size";
     protected final static String RETRY_REMOTE_SUBSCRIBE_THREAD_RUN_INTERVAL = "retry_remote_subscribe_thread_run_interval";
+    protected final static String MAX_ENTRIES_PER_LEDGER = "max_entries_per_ledger";
 
     // manager related settings
     protected final static String METADATA_MANAGER_BASED_TOPIC_MANAGER_ENABLED = "metadata_manager_based_topic_manager_enabled";
@@ -295,6 +296,17 @@ public class ServerConfiguration extends
         return conf.getInt(BK_QUORUM_SIZE, 2);
     }
 
+    /**
+     * This parameter is used when BookKeeper is the persistence storage,
+     * and indicates when the number of entries stored in a ledger reach
+     * the threshold, hub server will open a new ledger to write.
+     *
+     * @return max entries per ledger
+     */
+    public long getMaxEntriesPerLedger() {
+        return conf.getLong(MAX_ENTRIES_PER_LEDGER, 0L);
+    }
+
     /*
      * Is this a valid configuration that we can run with? This code might grow
      * over time.

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java?rev=1371255&r1=1371254&r2=1371255&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
(original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
Thu Aug  9 15:26:17 2012
@@ -20,14 +20,16 @@ package org.apache.hedwig.server.persist
 import java.io.IOException;
 import java.util.Enumeration;
 import java.util.Iterator;
-import java.util.Set;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerEntry;
@@ -78,6 +80,10 @@ public class BookkeeperPersistenceManage
     private ServerConfiguration cfg;
     private TopicManager tm;
 
+    // max number of entries allowed in a ledger
+    private static final long UNLIMITED_ENTRIES = 0L;
+    private final long maxEntriesPerLedger;
+
     static class InMemoryLedgerRange {
         LedgerRange range;
         long startSeqIdIncluded; // included, for the very first ledger, this
@@ -131,6 +137,19 @@ public class BookkeeperPersistenceManage
          */
         AtomicBoolean doRelease = new AtomicBoolean(false);
 
+        /**
+         * Flag indicats the topic is changing ledger
+         */
+        AtomicBoolean doChangeLedger = new AtomicBoolean(false);
+        /**
+         * Last seq id to change ledger.
+         */
+        long lastSeqIdBeforeLedgerChange = -1;
+        /**
+         * List to buffer all persist requests during changing ledger.
+         */
+        LinkedList<PersistRequest> deferredRequests = null;
+
         final static int UNLIMITED = 0;
         int messageBound = UNLIMITED;
     }
@@ -160,6 +179,7 @@ public class BookkeeperPersistenceManage
         this.tpManager = metaManagerFactory.newTopicPersistenceManager();
         this.cfg = cfg;
         this.tm = tm;
+        this.maxEntriesPerLedger = cfg.getMaxEntriesPerLedger();
         queuer = new TopicOpQueuer(executor);
         tm.addTopicOwnershipChangeListener(this);
     }
@@ -460,6 +480,51 @@ public class BookkeeperPersistenceManage
         return Math.max(seqId + skipAmount, getMinSeqIdForTopic(topic));
     }
 
+    /**
+     * Release topic on failure
+     *
+     * @param topic
+     *          Topic Name
+     * @param e
+     *          Failure Exception
+     * @param ctx
+     *          Callback context
+     */
+    protected void releaseTopicIfRequested(final ByteString topic, Exception e, Object ctx)
{
+        TopicInfo topicInfo = topicInfos.get(topic);
+        if (topicInfo == null) {
+            logger.warn("No topic found when trying to release ownership of topic " + topic.toStringUtf8()
+                      + " on failure.");
+            return;
+        }
+        // do release owner ship of topic
+        if (topicInfo.doRelease.compareAndSet(false, true)) {
+            logger.info("Release topic " + topic.toStringUtf8() + " when bookkeeper persistence
mananger encounters failure :",
+                        e);
+            tm.releaseTopic(topic, new Callback<Void>() {
+                @Override
+                public void operationFailed(Object ctx, PubSubException exception) {
+                    logger.error("Exception found on releasing topic " + topic.toStringUtf8()
+                               + " when encountering exception from bookkeeper:", exception);
+                }
+                @Override
+                public void operationFinished(Object ctx, Void resultOfOperation) {
+                    logger.info("successfully releasing topic {} when encountering"
+                              + " exception from bookkeeper", topic.toStringUtf8());
+                }
+            }, null);
+        }
+        // if release happens when the topic is changing ledger
+        // we need to fail all queued persist requests
+        if (topicInfo.doChangeLedger.get()) {
+            for (PersistRequest pr : topicInfo.deferredRequests) {
+                pr.getCallback().operationFailed(ctx, new PubSubException.ServiceDownException(e));
+            }
+            topicInfo.deferredRequests.clear();
+            topicInfo.lastSeqIdBeforeLedgerChange = -1;
+        }
+    }
+
     public class PersistOp extends TopicOpQueuer.SynchronousOp {
         PersistRequest request;
 
@@ -470,79 +535,134 @@ public class BookkeeperPersistenceManage
 
         @Override
         public void runInternal() {
-            final TopicInfo topicInfo = topicInfos.get(topic);
+            doPersistMessage(request);
+        }
+    }
 
-            if (topicInfo == null) {
-                request.getCallback().operationFailed(request.ctx,
-                                                 new PubSubException.ServerNotResponsibleForTopicException(""));
-                return;
-            }
+    /**
+     * Persist a message by executing a persist request.
+     */
+    protected void doPersistMessage(final PersistRequest request) {
+        final ByteString topic = request.topic;
+        final TopicInfo topicInfo = topicInfos.get(topic);
 
-            if (topicInfo.doRelease.get()) {
-                request.getCallback().operationFailed(request.ctx, new PubSubException.ServiceDownException(
-                    "The ownership of the topic is releasing due to unrecoverable issue."));
-                return;
-            }
+        if (topicInfo == null) {
+            request.getCallback().operationFailed(request.ctx,
+                                             new PubSubException.ServerNotResponsibleForTopicException(""));
+            return;
+        }
 
-            final long localSeqId = topicInfo.lastSeqIdPushed.getLocalComponent() + 1;
-            MessageSeqId.Builder builder = MessageSeqId.newBuilder();
-            if (request.message.hasMsgId()) {
-                MessageIdUtils.takeRegionMaximum(builder, topicInfo.lastSeqIdPushed, request.message.getMsgId());
-            } else {
-                builder.addAllRemoteComponents(topicInfo.lastSeqIdPushed.getRemoteComponentsList());
-            }
-            builder.setLocalComponent(localSeqId);
+        if (topicInfo.doRelease.get()) {
+            request.getCallback().operationFailed(request.ctx, new PubSubException.ServiceDownException(
+                "The ownership of the topic is releasing due to unrecoverable issue."));
+            return;
+        }
 
-            topicInfo.lastSeqIdPushed = builder.build();
-            Message msgToSerialize = Message.newBuilder(request.message).setMsgId(topicInfo.lastSeqIdPushed).build();
+        // if the topic is changing ledger, queue following persist requests until ledger
is changed
+        if (topicInfo.doChangeLedger.get()) {
+            logger.info("Topic {} is changing ledger, so queue persist request for message.",
+                        topic.toStringUtf8());
+            topicInfo.deferredRequests.add(request);
+            return;
+        }
 
-            final MessageSeqId responseSeqId = msgToSerialize.getMsgId();
+        final long localSeqId = topicInfo.lastSeqIdPushed.getLocalComponent() + 1;
+        MessageSeqId.Builder builder = MessageSeqId.newBuilder();
+        if (request.message.hasMsgId()) {
+            MessageIdUtils.takeRegionMaximum(builder, topicInfo.lastSeqIdPushed, request.message.getMsgId());
+        } else {
+            builder.addAllRemoteComponents(topicInfo.lastSeqIdPushed.getRemoteComponentsList());
+        }
+        builder.setLocalComponent(localSeqId);
 
-            topicInfo.currentLedgerRange.handle.asyncAddEntry(msgToSerialize.toByteArray(),
-            new SafeAsynBKCallback.AddCallback() {
-                @Override
-                public void safeAddComplete(int rc, LedgerHandle lh, long entryId, Object
ctx) {
-                    if (rc != BKException.Code.OK) {
-                        BKException bke = BKException.create(rc);
-                        logger.error("Error while persisting entry to ledger: " + lh.getId()
+ " for topic: "
-                                     + topic.toStringUtf8(), bke);
+        // check whether reach the threshold of a ledger, if it does,
+        // open a ledger to write
+        long entriesInThisLedger = localSeqId - topicInfo.currentLedgerRange.startSeqIdIncluded
+ 1;
+        if (UNLIMITED_ENTRIES != maxEntriesPerLedger &&
+            entriesInThisLedger >= maxEntriesPerLedger) {
+            if (topicInfo.doChangeLedger.compareAndSet(false, true)) {
+                // for order guarantees, we should wait until all the adding operations for
current ledger
+                // are succeed. so we just mark it as lastSeqIdBeforeLedgerChange
+                // when the lastSeqIdBeforeLedgerChange acked, we do changing the ledger
+                if (null == topicInfo.deferredRequests) {
+                    topicInfo.deferredRequests = new LinkedList<PersistRequest>();
+                }
+                topicInfo.lastSeqIdBeforeLedgerChange = localSeqId;
+            }
+        }
 
-                        // To preserve ordering guarantees, we
-                        // should give up the topic and not let
-                        // other operations through
-                        if (topicInfo.doRelease.compareAndSet(false, true)) {
-                            tm.releaseTopic(request.topic, new Callback<Void>() {
-                                @Override
-                                public void operationFailed(Object ctx, PubSubException exception)
{
-                                    logger.error("Exception found on releasing topic " +
request.topic.toStringUtf8()
-                                               + " when encountering exception from bookkeeper:",
exception);
-                                }
-                                @Override
-                                public void operationFinished(Object ctx, Void resultOfOperation)
{
-                                    logger.debug("successfully releasing topic {} when encountering"
-                                                 + " exception from bookkeeper", request.topic.toStringUtf8());
-                                }
-                            }, null);
-                        }
-                        request.getCallback().operationFailed(ctx, new PubSubException.ServiceDownException(bke));
-                        return;
-                    }
+        topicInfo.lastSeqIdPushed = builder.build();
+        Message msgToSerialize = Message.newBuilder(request.message).setMsgId(topicInfo.lastSeqIdPushed).build();
 
-                    if (entryId + topicInfo.currentLedgerRange.startSeqIdIncluded != localSeqId)
{
-                        String msg = "Expected BK to assign entry-id: "
-                                     + (localSeqId - topicInfo.currentLedgerRange.startSeqIdIncluded)
-                                     + " but it instead assigned entry-id: " + entryId +
" topic: "
-                                     + topic.toStringUtf8() + "ledger: " + lh.getId();
-                        logger.error(msg);
-                        throw new UnexpectedError(msg);
-                    }
+        final MessageSeqId responseSeqId = msgToSerialize.getMsgId();
+        topicInfo.currentLedgerRange.handle.asyncAddEntry(msgToSerialize.toByteArray(),
+        new SafeAsynBKCallback.AddCallback() {
+            AtomicBoolean processed = new AtomicBoolean(false);
+            @Override
+            public void safeAddComplete(int rc, LedgerHandle lh, long entryId, Object ctx)
{
 
-                    topicInfo.lastEntryIdAckedInCurrentLedger = entryId;
-                    request.getCallback().operationFinished(ctx, responseSeqId);
+                // avoid double callback by mistake, since we may do change ledger in this
callback.
+                if (!processed.compareAndSet(false, true)) {
+                    return;
+                }
+                if (rc != BKException.Code.OK) {
+                    BKException bke = BKException.create(rc);
+                    logger.error("Error while persisting entry to ledger: " + lh.getId()
+ " for topic: "
+                                 + topic.toStringUtf8(), bke);
+                    request.getCallback().operationFailed(ctx, new PubSubException.ServiceDownException(bke));
+
+                    // To preserve ordering guarantees, we
+                    // should give up the topic and not let
+                    // other operations through
+                    releaseTopicIfRequested(request.topic, bke, ctx);
+                    return;
                 }
-            }, request.ctx);
 
-        }
+                if (entryId + topicInfo.currentLedgerRange.startSeqIdIncluded != localSeqId)
{
+                    String msg = "Expected BK to assign entry-id: "
+                                 + (localSeqId - topicInfo.currentLedgerRange.startSeqIdIncluded)
+                                 + " but it instead assigned entry-id: " + entryId + " topic:
"
+                                 + topic.toStringUtf8() + "ledger: " + lh.getId();
+                    logger.error(msg);
+                    throw new UnexpectedError(msg);
+                }
+
+                topicInfo.lastEntryIdAckedInCurrentLedger = entryId;
+                request.getCallback().operationFinished(ctx, responseSeqId);
+                // if this acked entry is the last entry of current ledger
+                // we can add a ChangeLedgerOp to execute to change ledger
+                if (topicInfo.doChangeLedger.get() &&
+                    entryId + topicInfo.currentLedgerRange.startSeqIdIncluded == topicInfo.lastSeqIdBeforeLedgerChange)
{
+                    // change ledger
+                    changeLedger(topic, new Callback<Void>() {
+                        @Override
+                        public void operationFailed(Object ctx, PubSubException exception)
{
+                            logger.error("Failed to change ledger for topic " + topic.toStringUtf8(),
exception);
+                            // change ledger failed, we should give up topic
+                            releaseTopicIfRequested(request.topic, exception, ctx);
+                        }
+                        @Override
+                        public void operationFinished(Object ctx, Void resultOfOperation)
{
+                            topicInfo.doChangeLedger.set(false);
+                            topicInfo.lastSeqIdBeforeLedgerChange = -1;
+                            // the ledger is changed, persist queued requests
+                            // if the number of queued persist requests is more than maxEntriesPerLedger
+                            // we just persist maxEntriesPerLedger requests, other requests
are still queued
+                            // until next ledger changed.
+                            int numRequests = 0;
+                            while (!topicInfo.deferredRequests.isEmpty() &&
+                                   numRequests < maxEntriesPerLedger) {
+                                PersistRequest pr = topicInfo.deferredRequests.removeFirst();
+                                doPersistMessage(pr);
+                                ++numRequests;
+                            }
+                            logger.debug("Finished persisting {} queued requests, but there
are still {} requests in queue.",
+                                         numRequests, topicInfo.deferredRequests.size());
+                        }
+                    }, ctx);
+                }
+            }
+        }, request.ctx);
     }
 
     public void persistMessage(PersistRequest request) {
@@ -622,7 +742,7 @@ public class BookkeeperPersistenceManage
             }
 
             // All ledgers were found properly closed, just start a new one
-            openNewTopicLedger(version, topicInfo);
+            openNewTopicLedger(topic, version, topicInfo, false, cb, ctx);
         }
 
         /**
@@ -654,7 +774,7 @@ public class BookkeeperPersistenceManage
                         // couldn't write to, so just ignore it
                         logger.info("Pruning empty ledger: " + ledgerId + " for topic: "
+ topic.toStringUtf8());
                         closeLedger(ledgerHandle);
-                        openNewTopicLedger(expectedVersionOfLedgerNode, topicInfo);
+                        openNewTopicLedger(topic, expectedVersionOfLedgerNode, topicInfo,
false, cb, ctx);
                         return;
                     }
 
@@ -695,7 +815,7 @@ public class BookkeeperPersistenceManage
                             logger.info("Recovered unclosed ledger: " + ledgerId + " for
topic: "
                                         + topic.toStringUtf8() + " with " + numEntriesInLastLedger
+ " entries");
 
-                            openNewTopicLedger(expectedVersionOfLedgerNode, topicInfo);
+                            openNewTopicLedger(topic, expectedVersionOfLedgerNode, topicInfo,
false, cb, ctx);
                         }
                     }, ctx);
 
@@ -703,69 +823,83 @@ public class BookkeeperPersistenceManage
 
             }, ctx);
         }
+    }
 
-        /**
-         *
-         * @param requiredVersionOfLedgersNode
-         *            The version of the ledgers node when we read it, should be
-         *            the same when we try to write
-         */
-        private void openNewTopicLedger(final Version expectedVersionOfLedgersNode, final
TopicInfo topicInfo) {
-            bk.asyncCreateLedger(cfg.getBkEnsembleSize(), cfg.getBkQuorumSize(), DigestType.CRC32,
passwd,
-            new SafeAsynBKCallback.CreateCallback() {
-                boolean processed = false;
-
-                @Override
-                public void safeCreateComplete(int rc, LedgerHandle lh, Object ctx) {
-                    if (processed) {
-                        return;
-                    } else {
-                        processed = true;
-                    }
+    /**
+     * Open New Ledger to write for a topic.
+     *
+     * @param topic
+     *          Topic Name
+     * @param expectedVersionOfLedgersNode
+     *          Expected Version to Update Ledgers Node.
+     * @param topicInfo
+     *          Topic Information
+     * @param changeLedger
+     *          Whether is it called when changing ledger
+     * @param cb
+     *          Callback to trigger after opening new ledger.
+     * @param ctx
+     *          Callback context.
+     */
+    void openNewTopicLedger(final ByteString topic,
+                            final Version expectedVersionOfLedgersNode, final TopicInfo topicInfo,
+                            final boolean changeLedger,
+                            final Callback<Void> cb, final Object ctx) {
+        bk.asyncCreateLedger(cfg.getBkEnsembleSize(), cfg.getBkQuorumSize(), DigestType.CRC32,
passwd,
+        new SafeAsynBKCallback.CreateCallback() {
+            AtomicBoolean processed = new AtomicBoolean(false);
+
+            @Override
+            public void safeCreateComplete(int rc, LedgerHandle lh, Object ctx) {
+                if (!processed.compareAndSet(false, true)) {
+                    return;
+                }
 
-                    if (rc != BKException.Code.OK) {
-                        BKException bke = BKException.create(rc);
-                        logger.error("Could not create new ledger while acquiring topic:
"
-                                     + topic.toStringUtf8(), bke);
-                        cb.operationFailed(ctx, new PubSubException.ServiceDownException(bke));
-                        return;
-                    }
+                if (rc != BKException.Code.OK) {
+                    BKException bke = BKException.create(rc);
+                    logger.error("Could not create new ledger while acquiring topic: "
+                                 + topic.toStringUtf8(), bke);
+                    cb.operationFailed(ctx, new PubSubException.ServiceDownException(bke));
+                    return;
+                }
 
+                // compute last seq id
+                if (!changeLedger) {
                     topicInfo.lastSeqIdPushed = topicInfo.ledgerRanges.isEmpty() ? MessageSeqId.newBuilder()
                                                 .setLocalComponent(0).build() : topicInfo.ledgerRanges.lastEntry().getValue().range
                                                 .getEndSeqIdIncluded();
+                }
 
-                    LedgerRange lastRange = LedgerRange.newBuilder().setLedgerId(lh.getId()).build();
-                    topicInfo.currentLedgerRange = new InMemoryLedgerRange(lastRange, topicInfo.lastSeqIdPushed
-                            .getLocalComponent() + 1, lh);
-
-                    // Persist the fact that we started this new
-                    // ledger to ZK
+                LedgerRange lastRange = LedgerRange.newBuilder().setLedgerId(lh.getId()).build();
+                topicInfo.currentLedgerRange = new InMemoryLedgerRange(lastRange, topicInfo.lastSeqIdPushed
+                        .getLocalComponent() + 1, lh);
+
+                // Persist the fact that we started this new
+                // ledger to ZK
+
+                LedgerRanges.Builder builder = LedgerRanges.newBuilder();
+                for (InMemoryLedgerRange imlr : topicInfo.ledgerRanges.values()) {
+                    builder.addRanges(imlr.range);
+                }
+                builder.addRanges(lastRange);
 
-                    LedgerRanges.Builder builder = LedgerRanges.newBuilder();
-                    for (InMemoryLedgerRange imlr : topicInfo.ledgerRanges.values()) {
-                        builder.addRanges(imlr.range);
+                tpManager.writeTopicPersistenceInfo(
+                topic, builder.build(), expectedVersionOfLedgersNode, new Callback<Version>()
{
+                    @Override
+                    public void operationFinished(Object ctx, Version newVersion) {
+                        // Finally, all done
+                        topicInfo.ledgerRangesVersion = newVersion;
+                        topicInfos.put(topic, topicInfo);
+                        cb.operationFinished(ctx, null);
                     }
-                    builder.addRanges(lastRange);
-
-                    tpManager.writeTopicPersistenceInfo(
-                    topic, builder.build(), expectedVersionOfLedgersNode, new Callback<Version>()
{
-                        @Override
-                        public void operationFinished(Object ctx, Version newVersion) {
-                            // Finally, all done
-                            topicInfo.ledgerRangesVersion = newVersion;
-                            topicInfos.put(topic, topicInfo);
-                            cb.operationFinished(ctx, null);
-                        }
-                        @Override
-                        public void operationFailed(Object ctx, PubSubException exception)
{
-                            cb.operationFailed(ctx, exception);
-                        }
-                    }, ctx);
-                    return;
-                }
-            }, ctx);
-        }
+                    @Override
+                    public void operationFailed(Object ctx, PubSubException exception) {
+                        cb.operationFailed(ctx, exception);
+                    }
+                }, ctx);
+                return;
+            }
+        }, ctx);
     }
 
     /**
@@ -781,6 +915,71 @@ public class BookkeeperPersistenceManage
         queuer.pushAndMaybeRun(topic, new AcquireOp(topic, callback, ctx));
     }
 
+    /**
+     * Change ledger to write for a topic.
+     */
+    class ChangeLedgerOp extends TopicOpQueuer.AsynchronousOp<Void> {
+
+        public ChangeLedgerOp(ByteString topic, Callback<Void> cb, Object ctx) {
+            queuer.super(topic, cb, ctx);
+        }
+
+        @Override
+        public void run() {
+            TopicInfo topicInfo = topicInfos.get(topic);
+            if (null == topicInfo) {
+                logger.error("Weired! hub server doesn't own topic " + topic.toStringUtf8()
+                           + " when changing ledger to write.");
+                cb.operationFailed(ctx, new PubSubException.ServerNotResponsibleForTopicException(""));
+                return;
+            }
+            closeLastTopicLedgerAndOpenNewOne(topicInfo);
+        }
+
+        private void closeLastTopicLedgerAndOpenNewOne(final TopicInfo topicInfo) {
+            final long ledgerId = topicInfo.currentLedgerRange.handle.getId();
+            topicInfo.currentLedgerRange.handle.asyncClose(new CloseCallback() {
+                AtomicBoolean processed = new AtomicBoolean(false);
+                @Override
+                public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
+                    if (!processed.compareAndSet(false, true)) {
+                        return;
+                    }
+                    if (BKException.Code.OK != rc) {
+                        BKException bke = BKException.create(rc);
+                        logger.error("Could not close ledger " + ledgerId
+                                   + " while changing ledger of topic " + topic.toStringUtf8(),
bke);
+                        cb.operationFailed(ctx, new PubSubException.ServiceDownException(bke));
+                        return;
+                    }
+                    // update last range
+                    LedgerRange lastRange = LedgerRange.newBuilder().setLedgerId(ledgerId)
+                                            .setEndSeqIdIncluded(topicInfo.lastSeqIdPushed).build();
+
+                    topicInfo.currentLedgerRange.range = lastRange;
+                    // put current ledger to ledger ranges
+                    topicInfo.ledgerRanges.put(topicInfo.lastSeqIdPushed.getLocalComponent(),
+                                               topicInfo.currentLedgerRange);
+                    logger.info("Closed written ledger " + ledgerId + " for topic "
+                              + topic.toStringUtf8() + " to change ledger.");
+                    openNewTopicLedger(topic, topicInfo.ledgerRangesVersion,
+                                       topicInfo, true, cb, ctx);
+                }
+            }, ctx);
+        }
+
+    }
+
+    /**
+     * Change ledger to write for a topic.
+     *
+     * @param topic
+     *          Topic Name
+     */
+    protected void changeLedger(ByteString topic, Callback<Void> cb, Object ctx) {
+        queuer.pushAndMaybeRun(topic, new ChangeLedgerOp(topic, cb, ctx));
+    }
+
     public void closeLedger(LedgerHandle lh) {
         // try {
         // lh.asyncClose(noOpCloseCallback, null);

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookkeeperPersistenceManagerWhiteBox.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookkeeperPersistenceManagerWhiteBox.java?rev=1371255&r1=1371254&r2=1371255&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookkeeperPersistenceManagerWhiteBox.java
(original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookkeeperPersistenceManagerWhiteBox.java
Thu Aug  9 15:26:17 2012
@@ -26,6 +26,7 @@ import junit.framework.TestCase;
 
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.hedwig.protocol.PubSubProtocol;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -34,6 +35,7 @@ import org.apache.hedwig.util.Either;
 import com.google.protobuf.ByteString;
 import org.apache.hedwig.HelperMethods;
 import org.apache.hedwig.StubCallback;
+import org.apache.hedwig.exceptions.PubSubException;
 import org.apache.hedwig.protocol.PubSubProtocol.Message;
 import org.apache.hedwig.server.common.ServerConfiguration;
 import org.apache.hedwig.server.meta.MetadataManagerFactory;
@@ -41,8 +43,14 @@ import org.apache.hedwig.server.topics.T
 import org.apache.hedwig.server.topics.TrivialOwnAllTopicManager;
 import org.apache.hedwig.util.ConcurrencyUtils;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class TestBookkeeperPersistenceManagerWhiteBox extends TestCase {
 
+    protected static Logger logger =
+        LoggerFactory.getLogger(TestBookkeeperPersistenceManagerWhiteBox.class);
+
     BookKeeperTestBase bktb;
     private final int numBookies = 3;
     BookkeeperPersistenceManager bkpm;
@@ -91,6 +99,7 @@ public class TestBookkeeperPersistenceMa
         assertEquals(0, bkpm.topicInfos.get(topic).ledgerRanges.size());
     }
 
+    @Test
     public void testNonEmptyDirtyLedger() throws Exception {
 
         Random r = new Random();
@@ -112,18 +121,165 @@ public class TestBookkeeperPersistenceMa
             bkpm.persistMessage(new PersistRequest(topic, messages.get(index), persistCallback,
null));
             assertEquals(index + 1, ConcurrencyUtils.take(persistCallback.queue).left().getLocalComponent());
 
+            index++;
             // once in every 10 times, give up ledger
             if (r.nextInt(10) == 9) {
-                // Make the bkpm lose its memory
-                bkpm.topicInfos.clear();
-                numPrevLedgers++;
+                // should not release topic when the message is last message
+                // otherwise when we call scan, bookkeeper persistence manager doesn't own
the topic
+                if (index < messages.size()) {
+                    // Make the bkpm lose its memory
+                    bkpm.topicInfos.clear();
+                    numPrevLedgers++;
+                }
             }
+        }
+
+        // Lets scan now
+        StubScanCallback scanCallback = new StubScanCallback();
+        bkpm.scanMessages(new RangeScanRequest(topic, 1, NUM_MESSAGES_TO_TEST, Long.MAX_VALUE,
scanCallback, null));
+        for (int i = 0; i < messages.size(); i++) {
+            Message scannedMessage = ConcurrencyUtils.take(scanCallback.queue).left();
+            assertTrue(messages.get(i).getBody().equals(scannedMessage.getBody()));
+            assertEquals(i + 1, scannedMessage.getMsgId().getLocalComponent());
+        }
+        assertTrue(StubScanCallback.END_MESSAGE == ConcurrencyUtils.take(scanCallback.queue).left());
+
+    }
+
+    static final long maxEntriesPerLedger = 10;
+
+    class ChangeLedgerServerConfiguration extends ServerConfiguration {
+        @Override
+        public long getMaxEntriesPerLedger() {
+            return maxEntriesPerLedger;
+        }
+    }
+
+    @Test
+    public void testSyncChangeLedgers() throws Exception {
+        int NUM_MESSAGES_TO_TEST = 101;
+        int SIZE_OF_MESSAGES_TO_TEST = 100;
+        int index = 0;
+        List<Message> messages = HelperMethods.getRandomPublishedMessages(NUM_MESSAGES_TO_TEST,
+                                 SIZE_OF_MESSAGES_TO_TEST);
+
+        bkpm = new BookkeeperPersistenceManager(bktb.bk, mm, tm,
+                                                new ChangeLedgerServerConfiguration(), scheduler);
+
+        // acquire the topic
+        StubCallback<Void> stubCallback = new StubCallback<Void>();
+        bkpm.acquiredTopic(topic, stubCallback, null);
+        assertNull(ConcurrencyUtils.take(stubCallback.queue).right());
+        assertEquals(0, bkpm.topicInfos.get(topic).ledgerRanges.size());
+
+        while (index < messages.size()) {
+            logger.debug("Persist message {}", (index + 1));
+            StubCallback<MessageSeqId> persistCallback = new StubCallback<MessageSeqId>();
+            bkpm.persistMessage(new PersistRequest(topic, messages.get(index), persistCallback,
null));
+            assertEquals(index + 1, ConcurrencyUtils.take(persistCallback.queue).left().getLocalComponent());
+
             index++;
+            if (index % maxEntriesPerLedger == 1) {
+                assertEquals(index / maxEntriesPerLedger, bkpm.topicInfos.get(topic).ledgerRanges.size());
+            }
         }
+        assertEquals(NUM_MESSAGES_TO_TEST / maxEntriesPerLedger, bkpm.topicInfos.get(topic).ledgerRanges.size());
 
-        // ensure the bkpm has the topic before scanning
+        // Lets scan now
+        StubScanCallback scanCallback = new StubScanCallback();
+        bkpm.scanMessages(new RangeScanRequest(topic, 1, NUM_MESSAGES_TO_TEST, Long.MAX_VALUE,
scanCallback, null));
+        for (int i = 0; i < messages.size(); i++) {
+            Message scannedMessage = ConcurrencyUtils.take(scanCallback.queue).left();
+            assertTrue(messages.get(i).getBody().equals(scannedMessage.getBody()));
+            assertEquals(i + 1, scannedMessage.getMsgId().getLocalComponent());
+        }
+        assertTrue(StubScanCallback.END_MESSAGE == ConcurrencyUtils.take(scanCallback.queue).left());
+
+        // Make the bkpm lose its memory
+        bkpm.topicInfos.clear();
+
+        // acquire the topic again
+        stubCallback = new StubCallback<Void>();
+        bkpm.acquiredTopic(topic, stubCallback, null);
+        assertNull(ConcurrencyUtils.take(stubCallback.queue).right());
+        assertEquals(NUM_MESSAGES_TO_TEST / maxEntriesPerLedger + 1, bkpm.topicInfos.get(topic).ledgerRanges.size());
+    }
+
+    class OrderCheckingCallback extends StubCallback<MessageSeqId> {
+        long curMsgId;
+        int numMessages;
+        int numProcessed;
+        int numSuccess;
+        int numFailed;
+
+        OrderCheckingCallback(long startMsgId, int numMessages) {
+            this.curMsgId = startMsgId;
+            this.numMessages = numMessages;
+            numProcessed = numSuccess = numFailed = 0;
+        }
+
+        @Override
+        public void operationFailed(Object ctx, final PubSubException exception) {
+            synchronized (this) {
+                ++numFailed;
+                ++numProcessed;
+                if (numProcessed == numMessages) {
+                    MessageSeqId.Builder seqIdBuilder =
+                        MessageSeqId.newBuilder().setLocalComponent(curMsgId);
+                    super.operationFinished(ctx, seqIdBuilder.build());
+                }
+            }
+        }
+
+        @Override
+        public void operationFinished(Object ctx, final MessageSeqId seqId) {
+            synchronized(this) {
+                long msgId = seqId.getLocalComponent();
+                if (msgId == curMsgId) {
+                    ++curMsgId;
+                }
+                ++numSuccess;
+                ++numProcessed;
+                if (numProcessed == numMessages) {
+                    MessageSeqId.Builder seqIdBuilder =
+                        MessageSeqId.newBuilder().setLocalComponent(curMsgId);
+                    super.operationFinished(ctx, seqIdBuilder.build());
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testAsyncChangeLedgers() throws Exception {
+        int NUM_MESSAGES_TO_TEST = 101;
+        int SIZE_OF_MESSAGES_TO_TEST = 100;
+        List<Message> messages = HelperMethods.getRandomPublishedMessages(NUM_MESSAGES_TO_TEST,
+                                 SIZE_OF_MESSAGES_TO_TEST);
+
+        bkpm = new BookkeeperPersistenceManager(bktb.bk, mm, tm,
+                                                new ChangeLedgerServerConfiguration(), scheduler);
+
+        // acquire the topic
         StubCallback<Void> stubCallback = new StubCallback<Void>();
         bkpm.acquiredTopic(topic, stubCallback, null);
+        assertNull(ConcurrencyUtils.take(stubCallback.queue).right());
+        assertEquals(0, bkpm.topicInfos.get(topic).ledgerRanges.size());
+
+        OrderCheckingCallback persistCallback =
+            new OrderCheckingCallback(1, NUM_MESSAGES_TO_TEST);
+        for (Message message : messages) {
+            bkpm.persistMessage(new PersistRequest(topic, message, persistCallback, null));
+        }
+        assertEquals(NUM_MESSAGES_TO_TEST + 1,
+                     ConcurrencyUtils.take(persistCallback.queue).left().getLocalComponent());
+        assertEquals(NUM_MESSAGES_TO_TEST, persistCallback.numSuccess);
+        assertEquals(0, persistCallback.numFailed);
+        assertEquals(NUM_MESSAGES_TO_TEST / maxEntriesPerLedger,
+                     bkpm.topicInfos.get(topic).ledgerRanges.size());
+
+        // ensure the bkpm has the topic before scanning
+        stubCallback = new StubCallback<Void>();
+        bkpm.acquiredTopic(topic, stubCallback, null);
 
         // Lets scan now
         StubScanCallback scanCallback = new StubScanCallback();
@@ -140,6 +296,64 @@ public class TestBookkeeperPersistenceMa
         }
         assertTrue(StubScanCallback.END_MESSAGE == ConcurrencyUtils.take(scanCallback.queue).left());
 
+        // Make the bkpm lose its memory
+        bkpm.topicInfos.clear();
+
+        // acquire the topic again
+        stubCallback = new StubCallback<Void>();
+        bkpm.acquiredTopic(topic, stubCallback, null);
+        assertNull(ConcurrencyUtils.take(stubCallback.queue).right());
+        assertEquals(NUM_MESSAGES_TO_TEST / maxEntriesPerLedger + 1,
+                     bkpm.topicInfos.get(topic).ledgerRanges.size());
+    }
+
+    class ChangeLedgerCallback extends OrderCheckingCallback {
+        boolean tearDown = false;
+        ChangeLedgerCallback(long startMsgId, int numMessages) {
+            super(startMsgId, numMessages);
+        }
+
+        @Override
+        public void operationFinished(Object ctx, final MessageSeqId msgId) {
+            super.operationFinished(ctx, msgId);
+            // shutdown bookie server when changing ledger
+            // so following requests should fail
+            if (msgId.getLocalComponent() >= maxEntriesPerLedger && !tearDown)
{
+                try {
+                    bktb.tearDownOneBookieServer();
+                    bktb.tearDownOneBookieServer();
+                } catch (Exception e) {
+                    logger.error("Failed to tear down bookie server.");
+                }
+                tearDown = true;
+            }
+        }
     }
 
+    @Test
+    public void testChangeLedgerFailure() throws Exception {
+        int NUM_MESSAGES_TO_TEST = 101;
+        int SIZE_OF_MESSAGES_TO_TEST = 100;
+        List<Message> messages = HelperMethods.getRandomPublishedMessages(NUM_MESSAGES_TO_TEST,
+                                 SIZE_OF_MESSAGES_TO_TEST);
+
+        bkpm = new BookkeeperPersistenceManager(bktb.bk, mm, tm,
+                                                new ChangeLedgerServerConfiguration(), scheduler);
+
+        // acquire the topic
+        StubCallback<Void> stubCallback = new StubCallback<Void>();
+        bkpm.acquiredTopic(topic, stubCallback, null);
+        assertNull(ConcurrencyUtils.take(stubCallback.queue).right());
+        assertEquals(0, bkpm.topicInfos.get(topic).ledgerRanges.size());
+
+        ChangeLedgerCallback persistCallback =
+            new ChangeLedgerCallback(1, NUM_MESSAGES_TO_TEST);
+        for (Message message : messages) {
+            bkpm.persistMessage(new PersistRequest(topic, message, persistCallback, null));
+        }
+        assertEquals(maxEntriesPerLedger + 1,
+                     ConcurrencyUtils.take(persistCallback.queue).left().getLocalComponent());
+        assertEquals(maxEntriesPerLedger, persistCallback.numSuccess);
+        assertEquals(NUM_MESSAGES_TO_TEST - maxEntriesPerLedger, persistCallback.numFailed);
+    }
 }



Mime
View raw message