zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From iv...@apache.org
Subject svn commit: r1559192 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ bookkeeper-...
Date Fri, 17 Jan 2014 17:13:43 GMT
Author: ivank
Date: Fri Jan 17 17:13:43 2014
New Revision: 1559192

URL: http://svn.apache.org/r1559192
Log:
BOOKKEEPER-710: OpenLedgerNoRecovery should watch ensemble change. (sijie via ivank)

Added:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/BookieLedgerIndexer.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1559192&r1=1559191&r2=1559192&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Fri Jan 17 17:13:43 2014
@@ -136,6 +136,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-662: Major GC should kick in immediately if remaining space reaches a
warning threshold (sijie via ivank)
 
+        BOOKKEEPER-710: OpenLedgerNoRecovery should watch ensemble change. (sijie via ivank)
+
       hedwig-server:
 
         BOOKKEEPER-601: readahead cache size isn't updated correctly (sijie via fpj)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=1559192&r1=1559191&r2=1559192&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
Fri Jan 17 17:13:43 2014
@@ -536,6 +536,16 @@ public class LedgerHandle {
      */
 
     public void asyncReadLastConfirmed(final ReadLastConfirmedCallback cb, final Object ctx)
{
+        boolean isClosed;
+        long lastEntryId;
+        synchronized (this) {
+            isClosed = metadata.isClosed();
+            lastEntryId = metadata.getLastEntryId();
+        }
+        if (isClosed) {
+            cb.readLastConfirmedComplete(BKException.Code.OK, lastEntryId, ctx);
+            return;
+        }
         ReadLastConfirmedOp.LastConfirmedDataCallback innercb = new ReadLastConfirmedOp.LastConfirmedDataCallback()
{
                 @Override
                 public void readLastConfirmedDataComplete(int rc, DigestManager.RecoveryData
data) {

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java?rev=1559192&r1=1559191&r2=1559192&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
Fri Jan 17 17:13:43 2014
@@ -138,7 +138,7 @@ class LedgerOpenOp implements GenericCal
 
         // get the ledger metadata back
         try {
-            lh = new ReadOnlyLedgerHandle(bk, ledgerId, metadata, digestType, passwd);
+            lh = new ReadOnlyLedgerHandle(bk, ledgerId, metadata, digestType, passwd, !doRecovery);
         } catch (GeneralSecurityException e) {
             LOG.error("Security exception while opening ledger: " + ledgerId, e);
             cb.openComplete(BKException.Code.DigestNotInitializedException, null, this.ctx);

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java?rev=1559192&r1=1559191&r2=1559192&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
Fri Jan 17 17:13:43 2014
@@ -1,5 +1,3 @@
-package org.apache.bookkeeper.client;
-
 /*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -20,46 +18,76 @@ package org.apache.bookkeeper.client;
  * under the License.
  *
  */
+package org.apache.bookkeeper.client;
 
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
+import org.apache.bookkeeper.util.SafeRunnable;
+import org.apache.bookkeeper.versioning.Version;
+
 import java.security.GeneralSecurityException;
 import java.net.InetSocketAddress;
+import java.util.concurrent.RejectedExecutionException;
 
 /**
- * Read only ledger handle. This ledger handle allows you to 
- * read from a ledger but not to write to it. It overrides all 
+ * Read only ledger handle. This ledger handle allows you to
+ * read from a ledger but not to write to it. It overrides all
  * the public write operations from LedgerHandle.
  * It should be returned for BookKeeper#openLedger operations.
  */
-class ReadOnlyLedgerHandle extends LedgerHandle {
+class ReadOnlyLedgerHandle extends LedgerHandle implements LedgerMetadataListener {
+
+    class MetadataUpdater extends SafeRunnable {
+
+        final LedgerMetadata m;
+
+        MetadataUpdater(LedgerMetadata metadata) {
+            this.m = metadata;
+        }
+
+        @Override
+        public void safeRun() {
+            Version.Occurred occurred =
+                    ReadOnlyLedgerHandle.this.metadata.getVersion().compare(this.m.getVersion());
+            if (Version.Occurred.BEFORE == occurred) {
+                LOG.info("Updated ledger metadata for ledger {} to {}.", ledgerId, this.m);
+                ReadOnlyLedgerHandle.this.metadata = this.m;
+            }
+        }
+    }
+
     ReadOnlyLedgerHandle(BookKeeper bk, long ledgerId, LedgerMetadata metadata,
-                         DigestType digestType, byte[] password)
+                         DigestType digestType, byte[] password, boolean watch)
             throws GeneralSecurityException, NumberFormatException {
         super(bk, ledgerId, metadata, digestType, password);
+        if (watch) {
+            bk.getLedgerManager().registerLedgerMetadataListener(ledgerId, this);
+        }
     }
 
     @Override
-    public void close() 
+    public void close()
             throws InterruptedException, BKException {
-        // noop
+        bk.getLedgerManager().unregisterLedgerMetadataListener(ledgerId, this);
     }
 
     @Override
     public void asyncClose(CloseCallback cb, Object ctx) {
+        bk.getLedgerManager().unregisterLedgerMetadataListener(ledgerId, this);
         cb.closeComplete(BKException.Code.OK, this, ctx);
     }
-    
+
     @Override
     public long addEntry(byte[] data) throws InterruptedException, BKException {
         return addEntry(data, 0, data.length);
     }
-    
+
     @Override
     public long addEntry(byte[] data, int offset, int length)
             throws InterruptedException, BKException {
-        LOG.error("Tried to add entry on a Read-Only ledger handle, ledgerid=" + ledgerId);
       
+        LOG.error("Tried to add entry on a Read-Only ledger handle, ledgerid=" + ledgerId);
         throw BKException.create(BKException.Code.IllegalOpException);
     }
 
@@ -103,4 +131,37 @@ class ReadOnlyLedgerHandle extends Ledge
             }
         }
     }
+
+    @Override
+    public void onChanged(long lid, LedgerMetadata newMetadata) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Received ledger metadata update on {} : {}", lid, newMetadata);
+        }
+        if (this.ledgerId != lid) {
+            return;
+        }
+        if (null == newMetadata) {
+            return;
+        }
+        Version.Occurred occurred =
+                this.metadata.getVersion().compare(newMetadata.getVersion());
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Try to update metadata from {} to {} : {}",
+                    new Object[] { this.metadata, newMetadata, occurred });
+        }
+        if (Version.Occurred.BEFORE == occurred) { // the metadata is updated
+            try {
+                bk.mainWorkerPool.submitOrdered(ledgerId, new MetadataUpdater(newMetadata));
+            } catch (RejectedExecutionException ree) {
+                LOG.error("Failed on submitting updater to update ledger metadata on ledger
{} : {}",
+                        ledgerId, newMetadata);
+            }
+        }
+    }
+
+    @Override
+    public String toString() {
+        return String.format("ReadOnlyLedgerHandle(lid = %d, id = %d)", ledgerId, super.hashCode());
+    }
+
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java?rev=1559192&r1=1559191&r2=1559192&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
Fri Jan 17 17:13:43 2014
@@ -18,15 +18,22 @@
 package org.apache.bookkeeper.meta;
 
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.List;
 import java.util.NavigableSet;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.LedgerMetadata;
 import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.MultiCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
 import org.apache.bookkeeper.util.BookKeeperConstants;
@@ -38,22 +45,81 @@ import org.apache.zookeeper.AsyncCallbac
 import org.apache.zookeeper.AsyncCallback.VoidCallback;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 /**
  * Abstract ledger manager based on zookeeper, which provides common methods such as query
zk nodes.
  */
-abstract class AbstractZkLedgerManager implements LedgerManager {
+abstract class AbstractZkLedgerManager implements LedgerManager, Watcher {
 
     private final static Logger LOG = LoggerFactory.getLogger(AbstractZkLedgerManager.class);
 
+    static int ZK_CONNECT_BACKOFF_MS = 200;
+
     protected final AbstractConfiguration conf;
     protected final ZooKeeper zk;
     protected final String ledgerRootPath;
 
+    // ledger metadata listeners
+    protected final ConcurrentMap<Long, Set<LedgerMetadataListener>> listeners
=
+            new ConcurrentHashMap<Long, Set<LedgerMetadataListener>>();
+    // we use this to prevent long stack chains from building up in callbacks
+    protected ScheduledExecutorService scheduler;
+
+    protected class ReadLedgerMetadataTask implements Runnable, GenericCallback<LedgerMetadata>
{
+
+        final long ledgerId;
+
+        ReadLedgerMetadataTask(long ledgerId) {
+            this.ledgerId = ledgerId;
+        }
+
+        @Override
+        public void run() {
+            if (null != listeners.get(ledgerId)) {
+                LOG.debug("Re-read ledger metadata for {}.", ledgerId);
+                readLedgerMetadata(ledgerId, this, AbstractZkLedgerManager.this);
+            } else {
+                LOG.debug("Ledger metadata listener for ledger {} is already removed.", ledgerId);
+            }
+        }
+
+        @Override
+        public void operationComplete(int rc, final LedgerMetadata result) {
+            if (BKException.Code.OK == rc) {
+                final Set<LedgerMetadataListener> listenerSet = listeners.get(ledgerId);
+                if (null != listenerSet) {
+                    LOG.debug("Ledger metadata is changed for {} : {}.", ledgerId, result);
+                    scheduler.submit(new Runnable() {
+                        @Override
+                        public void run() {
+                            for (LedgerMetadataListener listener : listenerSet) {
+                                listener.onChanged(ledgerId, result);
+                            }
+                        }
+                    });
+                }
+            } else if (BKException.Code.NoSuchLedgerExistsException == rc) {
+                // the ledger is removed, do nothing
+                Set<LedgerMetadataListener> listenerSet = listeners.remove(ledgerId);
+                if (null != listenerSet) {
+                    LOG.debug("Removed ledger metadata listener set on ledger {} as its ledger
is deleted : {}",
+                            ledgerId, listenerSet.size());
+                }
+            } else {
+                LOG.warn("Failed on read ledger metadata of ledger {} : {}", ledgerId, rc);
+                scheduler.schedule(this, ZK_CONNECT_BACKOFF_MS, TimeUnit.MILLISECONDS);
+            }
+        }
+    }
+
     /**
      * ZooKeeper-based Ledger Manager Constructor
      *
@@ -66,6 +132,12 @@ abstract class AbstractZkLedgerManager i
         this.conf = conf;
         this.zk = zk;
         this.ledgerRootPath = conf.getZkLedgersRootPath();
+        this.scheduler = Executors.newSingleThreadScheduledExecutor();
+        ThreadFactoryBuilder tfb = new ThreadFactoryBuilder().setNameFormat(
+                "ZkLedgerManagerScheduler-%d");
+        this.scheduler = Executors
+                .newSingleThreadScheduledExecutor(tfb.build());
+        LOG.debug("Using AbstractZkLedgerManager with root path : {}", ledgerRootPath);
     }
 
     /**
@@ -87,6 +159,53 @@ abstract class AbstractZkLedgerManager i
      */
     protected abstract long getLedgerId(String ledgerPath) throws IOException;
 
+    @Override
+    public void process(WatchedEvent event) {
+        LOG.info("Received watched event {} from zookeeper based ledger manager.", event);
+        if (Event.EventType.None == event.getType()) {
+            /** TODO: BOOKKEEPER-537 to handle expire events.
+            if (Event.KeeperState.Expired == event.getState()) {
+                LOG.info("ZooKeeper client expired on ledger manager.");
+                Set<Long> keySet = new HashSet<Long>(listeners.keySet());
+                for (Long lid : keySet) {
+                    scheduler.submit(new ReadLedgerMetadataTask(lid));
+                    LOG.info("Re-read ledger metadata for {} after zookeeper session expired.",
lid);
+                }
+            }
+            **/
+            return;
+        }
+        String path = event.getPath();
+        if (null == path) {
+            return;
+        }
+        final long ledgerId;
+        try {
+            ledgerId = getLedgerId(event.getPath());
+        } catch (IOException ioe) {
+            LOG.info("Received invalid ledger path {} : ", event.getPath(), ioe);
+            return;
+        }
+        switch (event.getType()) {
+        case NodeDeleted:
+            Set<LedgerMetadataListener> listenerSet = listeners.remove(ledgerId);
+            if (null != listenerSet) {
+                LOG.debug("Removed ledger metadata listeners on ledger {} : {}",
+                        ledgerId, listenerSet);
+            } else {
+                LOG.debug("No ledger metadata listeners to remove from ledger {} after it's
deleted.",
+                        ledgerId);
+            }
+            break;
+        case NodeDataChanged:
+            new ReadLedgerMetadataTask(ledgerId).run();
+            break;
+        default:
+            LOG.debug("Received event {} on {}.", event.getType(), event.getPath());
+            break;
+        }
+    }
+
     /**
      * Removes ledger metadata from ZooKeeper if version matches.
      *
@@ -120,6 +239,15 @@ abstract class AbstractZkLedgerManager i
                     LOG.warn("Ledger node does not exist in ZooKeeper: ledgerId={}", ledgerId);
                     bkRc = BKException.Code.NoSuchLedgerExistsException;
                 } else if (rc == KeeperException.Code.OK.intValue()) {
+                    // removed listener on ledgerId
+                    Set<LedgerMetadataListener> listenerSet = listeners.remove(ledgerId);
+                    if (null != listenerSet) {
+                        LOG.debug("Remove registered ledger metadata listeners on ledger
{} after ledger is deleted.",
+                                ledgerId, listenerSet);
+                    } else {
+                        LOG.debug("No ledger metadata listeners to remove from ledger {}
when it's being deleted.",
+                                ledgerId);
+                    }
                     bkRc = BKException.Code.OK;
                 } else {
                     bkRc = BKException.Code.ZKException;
@@ -130,8 +258,49 @@ abstract class AbstractZkLedgerManager i
     }
 
     @Override
+    public void registerLedgerMetadataListener(long ledgerId, LedgerMetadataListener listener)
{
+        if (null != listener) {
+            LOG.info("Registered ledger metadata listener {} on ledger {}.", listener, ledgerId);
+            Set<LedgerMetadataListener> listenerSet = listeners.get(ledgerId);
+            if (listenerSet == null) {
+                Set<LedgerMetadataListener> newListenerSet = new HashSet<LedgerMetadataListener>();
+                Set<LedgerMetadataListener> oldListenerSet = listeners.putIfAbsent(ledgerId,
newListenerSet);
+                if (null != oldListenerSet) {
+                    listenerSet = oldListenerSet;
+                } else {
+                    listenerSet = newListenerSet;
+                }
+            }
+            synchronized (listenerSet) {
+                listenerSet.add(listener);
+            }
+            new ReadLedgerMetadataTask(ledgerId).run();
+        }
+    }
+
+    @Override
+    public void unregisterLedgerMetadataListener(long ledgerId, LedgerMetadataListener listener)
{
+        Set<LedgerMetadataListener> listenerSet = listeners.get(ledgerId);
+        if (listenerSet != null) {
+            synchronized (listenerSet) {
+                if (listenerSet.remove(listener)) {
+                    LOG.info("Unregistered ledger metadata listener {} on ledger {}.", listener,
ledgerId);
+                }
+                if (listenerSet.isEmpty()) {
+                    listeners.remove(ledgerId, listenerSet);
+                }
+            }
+        }
+    }
+
+    @Override
     public void readLedgerMetadata(final long ledgerId, final GenericCallback<LedgerMetadata>
readCb) {
-        zk.getData(getLedgerPath(ledgerId), false, new DataCallback() {
+        readLedgerMetadata(ledgerId, readCb, null);
+    }
+
+    protected void readLedgerMetadata(final long ledgerId, final GenericCallback<LedgerMetadata>
readCb,
+                                      Watcher watcher) {
+        zk.getData(getLedgerPath(ledgerId), watcher, new DataCallback() {
             @Override
             public void processResult(int rc, String path, Object ctx, byte[] data, Stat
stat) {
                 if (rc == KeeperException.Code.NONODE.intValue()) {
@@ -296,5 +465,10 @@ abstract class AbstractZkLedgerManager i
 
     @Override
     public void close() {
+        try {
+            scheduler.shutdown();
+        } catch (Exception e) {
+            LOG.warn("Error when closing zookeeper based ledger manager: ", e);
+        }
     }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java?rev=1559192&r1=1559191&r2=1559192&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
Fri Jan 17 17:13:43 2014
@@ -1,5 +1,3 @@
-package org.apache.bookkeeper.meta;
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -17,6 +15,7 @@ package org.apache.bookkeeper.meta;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.bookkeeper.meta;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -45,8 +44,6 @@ import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
 /**
  * Hierarchical Ledger Manager which manages ledger meta in zookeeper using 2-level hierarchical
znodes.
  *
@@ -74,9 +71,6 @@ class HierarchicalLedgerManager extends 
     // Path to generate global id
     private final String idGenPath;
 
-    // we use this to prevent long stack chains from building up in callbacks
-    ScheduledExecutorService scheduler;
-
     /**
      * Constructor
      *
@@ -89,21 +83,6 @@ class HierarchicalLedgerManager extends 
         super(conf, zk);
 
         this.idGenPath = ledgerRootPath + IDGENERATION_PREFIX;
-        ThreadFactoryBuilder tfb = new ThreadFactoryBuilder().setNameFormat(
-                "HierarchialLedgerManagerScheduler-%d");
-        this.scheduler = Executors
-                .newSingleThreadScheduledExecutor(tfb.build());
-        LOG.debug("Using HierarchicalLedgerManager with root path : {}", ledgerRootPath);
-    }
-
-    @Override
-    public void close() {
-        try {
-            scheduler.shutdown();
-        } catch (Exception e) {
-            LOG.warn("Error when closing HierarchicalLedgerManager : ", e);
-        }
-        super.close();
     }
 
     @Override

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java?rev=1559192&r1=1559191&r2=1559192&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java
Fri Jan 17 17:13:43 2014
@@ -27,6 +27,7 @@ import java.util.TreeSet;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.LedgerMetadata;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
 import org.apache.bookkeeper.versioning.Version;
@@ -94,6 +95,26 @@ public interface LedgerManager extends C
     public void writeLedgerMetadata(long ledgerId, LedgerMetadata metadata, GenericCallback<Void>
cb);
 
     /**
+     * Register the ledger metadata <i>listener</i> on <i>ledgerId</i>.
+     *
+     * @param ledgerId
+     *          ledger id.
+     * @param listener
+     *          listener.
+     */
+    public abstract void registerLedgerMetadataListener(long ledgerId, LedgerMetadataListener
listener);
+
+    /**
+     * Unregister the ledger metadata <i>listener</i> on <i>ledgerId</i>.
+     *
+     * @param ledgerId
+     *          ledger id.
+     * @param listener
+     *          ledger metadata listener.
+     */
+    public abstract void unregisterLedgerMetadataListener(long ledgerId, LedgerMetadataListener
listener);
+
+    /**
      * Loop to process all ledgers.
      * <p>
      * <ul>

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java?rev=1559192&r1=1559191&r2=1559192&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
Fri Jan 17 17:13:43 2014
@@ -43,6 +43,7 @@ import org.apache.bookkeeper.metastore.M
 import org.apache.bookkeeper.metastore.MetastoreTableItem;
 import org.apache.bookkeeper.metastore.Value;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
 import org.apache.bookkeeper.replication.ReplicationException;
 import org.apache.bookkeeper.util.StringUtils;
@@ -210,6 +211,16 @@ public class MSLedgerManagerFactory exte
         }
 
         @Override
+        public void registerLedgerMetadataListener(long ledgerId, LedgerMetadataListener
listener) {
+            // TODO: should provide ledger metadata listener in metadata store.
+        }
+
+        @Override
+        public void unregisterLedgerMetadataListener(long ledgerId, LedgerMetadataListener
listener) {
+            // TODO: should provide ledger metadata listener in metadata store.
+        }
+
+        @Override
         public void close() {
             try {
                 scheduler.shutdown();

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java?rev=1559192&r1=1559191&r2=1559192&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
Fri Jan 17 17:13:43 2014
@@ -25,6 +25,7 @@ import java.net.InetSocketAddress;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.bookkeeper.client.LedgerMetadata;
 import org.apache.zookeeper.AsyncCallback;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.slf4j.Logger;
@@ -45,6 +46,21 @@ public class BookkeeperInternalCallbacks
      *
      */
 
+    /**
+     * Listener on ledger metadata changes.
+     */
+    public interface LedgerMetadataListener {
+        /**
+         * Triggered each time ledger metadata changed.
+         *
+         * @param ledgerId
+         *          ledger id.
+         * @param metadata
+         *          new ledger metadata.
+         */
+        void onChanged(long ledgerId, LedgerMetadata metadata);
+    }
+
     public interface WriteCallback {
         void writeComplete(int rc, long ledgerId, long entryId, InetSocketAddress addr, Object
ctx);
     }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/BookieLedgerIndexer.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/BookieLedgerIndexer.java?rev=1559192&r1=1559191&r2=1559192&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/BookieLedgerIndexer.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/BookieLedgerIndexer.java
Fri Jan 17 17:13:43 2014
@@ -54,7 +54,7 @@ public class BookieLedgerIndexer {
     /**
      * Generating bookie vs its ledgers map by reading all the ledgers in each
      * bookie and parsing its metadata.
-     * 
+     *
      * @return bookie2ledgersMap map of bookie vs ledgers
      * @throws BKAuditException
      *             exception while getting bookie-ledgers

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java?rev=1559192&r1=1559191&r2=1559192&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
Fri Jan 17 17:13:43 2014
@@ -38,6 +38,7 @@ import org.apache.bookkeeper.conf.Server
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.util.MathUtils;
@@ -470,6 +471,16 @@ public class CompactionTest extends Book
                     unsupported();
                 }
                 @Override
+                public void registerLedgerMetadataListener(long ledgerId,
+                        LedgerMetadataListener listener) {
+                    unsupported();
+                }
+                @Override
+                public void unregisterLedgerMetadataListener(long ledgerId,
+                        LedgerMetadataListener listener) {
+                    unsupported();
+                }
+                @Override
                 public void close() throws IOException {}
 
                 void unsupported() {

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java?rev=1559192&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
(added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
Fri Jan 17 17:13:43 2014
@@ -0,0 +1,71 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
+
+public class TestWatchEnsembleChange extends BookKeeperClusterTestCase {
+
+    static Logger LOG = LoggerFactory.getLogger(TestWatchEnsembleChange.class);
+
+    final DigestType digestType;
+
+    public TestWatchEnsembleChange() {
+        super(7);
+        this.digestType = DigestType.CRC32;
+    }
+
+    @Test(timeout = 60000)
+    public void testWatchEnsembleChange() throws Exception {
+        int numEntries = 10;
+        LedgerHandle lh = bkc.createLedger(3, 3, 3, digestType, "".getBytes());
+        for (int i=0; i<numEntries; i++) {
+            lh.addEntry(("data" + i).getBytes());
+            LOG.info("Added entry {}.", i);
+        }
+        LedgerHandle readLh = bkc.openLedgerNoRecovery(lh.getId(), digestType, "".getBytes());
+        long lastLAC = readLh.getLastAddConfirmed();
+        assertEquals(numEntries - 2, lastLAC);
+        ArrayList<InetSocketAddress> ensemble =
+                lh.getLedgerMetadata().currentEnsemble;
+        for (InetSocketAddress addr : ensemble) {
+            killBookie(addr);
+        }
+        // write another batch of entries, which will trigger ensemble change
+        for (int i=0; i<numEntries; i++) {
+            lh.addEntry(("data" + (numEntries + i)).getBytes());
+            LOG.info("Added entry {}.", (numEntries + i));
+        }
+        TimeUnit.SECONDS.sleep(5);
+        readLh.readLastConfirmed();
+        assertEquals(2 * numEntries - 2, readLh.getLastAddConfirmed());
+        readLh.close();
+        lh.close();
+    }
+}



Mime
View raw message