Author: fpj
Date: Fri Apr 11 23:34:52 2014
New Revision: 1586799
URL: http://svn.apache.org/r1586799
Log:
BOOKKEEPER-710. OpenLedgerNoRecovery should watch ensemble change. (sijie, ivank via fpj)
Added:
zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
Modified:
zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt
zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java
zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/BookieLedgerIndexer.java
zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
Modified: zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt?rev=1586799&r1=1586798&r2=1586799&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt (original)
+++ zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt Fri Apr 11 23:34:52 2014
@@ -17,6 +17,8 @@ Release 4.2.3 - 2013-12-04
BOOKKEEPER-714: Logging channel exceptions in PerChannelBookieClient (sijie)
BOOKKEEPER-726: PerChannelBookieClient should print address that it failed to connect
to when it fails to correct (ivank via sijie)
+
+ BOOKKEEPER-710: OpenLedgerNoRecovery should watch ensemble change. (sijie, ivank
via fpj)
Release 4.2.2 - 2013-10-02
Modified: zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=1586799&r1=1586798&r2=1586799&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
(original)
+++ zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
Fri Apr 11 23:34:52 2014
@@ -539,6 +539,16 @@ public class LedgerHandle {
*/
public void asyncReadLastConfirmed(final ReadLastConfirmedCallback cb, final Object ctx)
{
+ boolean isClosed;
+ long lastEntryId;
+ synchronized (this) {
+ isClosed = metadata.isClosed();
+ lastEntryId = metadata.getLastEntryId();
+ }
+ if (isClosed) {
+ cb.readLastConfirmedComplete(BKException.Code.OK, lastEntryId, ctx);
+ return;
+ }
ReadLastConfirmedOp.LastConfirmedDataCallback innercb = new ReadLastConfirmedOp.LastConfirmedDataCallback()
{
@Override
public void readLastConfirmedDataComplete(int rc, DigestManager.RecoveryData
data) {
Modified: zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java?rev=1586799&r1=1586798&r2=1586799&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
(original)
+++ zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
Fri Apr 11 23:34:52 2014
@@ -138,7 +138,7 @@ class LedgerOpenOp implements GenericCal
// get the ledger metadata back
try {
- lh = new ReadOnlyLedgerHandle(bk, ledgerId, metadata, digestType, passwd);
+ lh = new ReadOnlyLedgerHandle(bk, ledgerId, metadata, digestType, passwd, !doRecovery);
} catch (GeneralSecurityException e) {
LOG.error("Security exception while opening ledger: " + ledgerId, e);
cb.openComplete(BKException.Code.DigestNotInitializedException, null, this.ctx);
Modified: zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java?rev=1586799&r1=1586798&r2=1586799&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
(original)
+++ zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
Fri Apr 11 23:34:52 2014
@@ -1,5 +1,3 @@
-package org.apache.bookkeeper.client;
-
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -20,46 +18,76 @@ package org.apache.bookkeeper.client;
* under the License.
*
*/
+package org.apache.bookkeeper.client;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
+import org.apache.bookkeeper.util.SafeRunnable;
+import org.apache.bookkeeper.versioning.Version;
+
import java.security.GeneralSecurityException;
import java.net.InetSocketAddress;
+import java.util.concurrent.RejectedExecutionException;
/**
- * Read only ledger handle. This ledger handle allows you to
- * read from a ledger but not to write to it. It overrides all
+ * Read only ledger handle. This ledger handle allows you to
+ * read from a ledger but not to write to it. It overrides all
* the public write operations from LedgerHandle.
* It should be returned for BookKeeper#openLedger operations.
*/
-class ReadOnlyLedgerHandle extends LedgerHandle {
+class ReadOnlyLedgerHandle extends LedgerHandle implements LedgerMetadataListener {
+
+ class MetadataUpdater extends SafeRunnable {
+
+ final LedgerMetadata m;
+
+ MetadataUpdater(LedgerMetadata metadata) {
+ this.m = metadata;
+ }
+
+ @Override
+ public void safeRun() {
+ Version.Occurred occurred =
+ ReadOnlyLedgerHandle.this.metadata.getVersion().compare(this.m.getVersion());
+ if (Version.Occurred.BEFORE == occurred) {
+ LOG.info("Updated ledger metadata for ledger {} to {}.", ledgerId, this.m);
+ ReadOnlyLedgerHandle.this.metadata = this.m;
+ }
+ }
+ }
+
ReadOnlyLedgerHandle(BookKeeper bk, long ledgerId, LedgerMetadata metadata,
- DigestType digestType, byte[] password)
+ DigestType digestType, byte[] password, boolean watch)
throws GeneralSecurityException, NumberFormatException {
super(bk, ledgerId, metadata, digestType, password);
+ if (watch) {
+ bk.getLedgerManager().registerLedgerMetadataListener(ledgerId, this);
+ }
}
@Override
- public void close()
+ public void close()
throws InterruptedException, BKException {
- // noop
+ bk.getLedgerManager().unregisterLedgerMetadataListener(ledgerId, this);
}
@Override
public void asyncClose(CloseCallback cb, Object ctx) {
+ bk.getLedgerManager().unregisterLedgerMetadataListener(ledgerId, this);
cb.closeComplete(BKException.Code.OK, this, ctx);
}
-
+
@Override
public long addEntry(byte[] data) throws InterruptedException, BKException {
return addEntry(data, 0, data.length);
}
-
+
@Override
public long addEntry(byte[] data, int offset, int length)
throws InterruptedException, BKException {
- LOG.error("Tried to add entry on a Read-Only ledger handle, ledgerid=" + ledgerId);
+ LOG.error("Tried to add entry on a Read-Only ledger handle, ledgerid=" + ledgerId);
throw BKException.create(BKException.Code.IllegalOpException);
}
@@ -103,4 +131,37 @@ class ReadOnlyLedgerHandle extends Ledge
}
}
}
+
+ @Override
+ public void onChanged(long lid, LedgerMetadata newMetadata) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received ledger metadata update on {} : {}", lid, newMetadata);
+ }
+ if (this.ledgerId != lid) {
+ return;
+ }
+ if (null == newMetadata) {
+ return;
+ }
+ Version.Occurred occurred =
+ this.metadata.getVersion().compare(newMetadata.getVersion());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Try to update metadata from {} to {} : {}",
+ new Object[] { this.metadata, newMetadata, occurred });
+ }
+ if (Version.Occurred.BEFORE == occurred) { // the metadata is updated
+ try {
+ bk.mainWorkerPool.submitOrdered(ledgerId, new MetadataUpdater(newMetadata));
+ } catch (RejectedExecutionException ree) {
+ LOG.error("Failed on submitting updater to update ledger metadata on ledger
{} : {}",
+ ledgerId, newMetadata);
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return String.format("ReadOnlyLedgerHandle(lid = %d, id = %d)", ledgerId, super.hashCode());
+ }
+
}
Modified: zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java?rev=1586799&r1=1586798&r2=1586799&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
(original)
+++ zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
Fri Apr 11 23:34:52 2014
@@ -18,15 +18,22 @@
package org.apache.bookkeeper.meta;
import java.io.IOException;
+import java.util.HashSet;
import java.util.List;
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerMetadata;
import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.MultiCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
import org.apache.bookkeeper.util.BookKeeperConstants;
@@ -38,22 +45,82 @@ import org.apache.zookeeper.AsyncCallbac
import org.apache.zookeeper.AsyncCallback.VoidCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
/**
* Abstract ledger manager based on zookeeper, which provides common methods such as query
zk nodes.
*/
-abstract class AbstractZkLedgerManager implements LedgerManager {
+abstract class AbstractZkLedgerManager implements LedgerManager, Watcher {
static Logger LOG = LoggerFactory.getLogger(AbstractZkLedgerManager.class);
+ static int ZK_CONNECT_BACKOFF_MS = 200;
+
protected final AbstractConfiguration conf;
protected final ZooKeeper zk;
protected final String ledgerRootPath;
+ // ledger metadata listeners
+ protected final ConcurrentMap<Long, Set<LedgerMetadataListener>> listeners
=
+ new ConcurrentHashMap<Long, Set<LedgerMetadataListener>>();
+ // we use this to prevent long stack chains from building up in callbacks
+ protected ScheduledExecutorService scheduler;
+
+ protected class ReadLedgerMetadataTask implements Runnable, GenericCallback<LedgerMetadata>
{
+
+ final long ledgerId;
+
+ ReadLedgerMetadataTask(long ledgerId) {
+ this.ledgerId = ledgerId;
+ }
+
+ @Override
+ public void run() {
+ if (null != listeners.get(ledgerId)) {
+ LOG.debug("Re-read ledger metadata for {}.", ledgerId);
+ readLedgerMetadata(ledgerId, this, AbstractZkLedgerManager.this);
+ } else {
+ LOG.debug("Ledger metadata listener for ledger {} is already removed.", ledgerId);
+ }
+ }
+
+ @Override
+ public void operationComplete(int rc, final LedgerMetadata result) {
+ if (BKException.Code.OK == rc) {
+ final Set<LedgerMetadataListener> listenerSet = listeners.get(ledgerId);
+ if (null != listenerSet) {
+ LOG.debug("Ledger metadata is changed for {} : {}.", ledgerId, result);
+ scheduler.submit(new Runnable() {
+ @Override
+ public void run() {
+ for (LedgerMetadataListener listener : listenerSet) {
+ listener.onChanged(ledgerId, result);
+ }
+ }
+ });
+ }
+ } else if (BKException.Code.NoSuchLedgerExistsException == rc) {
+ // the ledger is removed, do nothing
+ Set<LedgerMetadataListener> listenerSet = listeners.remove(ledgerId);
+ if (null != listenerSet) {
+ LOG.debug("Removed ledger metadata listener set on ledger {} as its ledger
is deleted : {}",
+ ledgerId, listenerSet.size());
+ }
+ } else {
+ LOG.warn("Failed on read ledger metadata of ledger {} : {}", ledgerId, rc);
+ scheduler.schedule(this, ZK_CONNECT_BACKOFF_MS, TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+
+
/**
* ZooKeeper-based Ledger Manager Constructor
*
@@ -66,6 +133,12 @@ abstract class AbstractZkLedgerManager i
this.conf = conf;
this.zk = zk;
this.ledgerRootPath = conf.getZkLedgersRootPath();
+ this.scheduler = Executors.newSingleThreadScheduledExecutor();
+ ThreadFactoryBuilder tfb = new ThreadFactoryBuilder().setNameFormat(
+ "ZkLedgerManagerScheduler-%d");
+ this.scheduler = Executors
+ .newSingleThreadScheduledExecutor(tfb.build());
+ LOG.debug("Using AbstractZkLedgerManager with root path : {}", ledgerRootPath);
}
/**
@@ -87,6 +160,53 @@ abstract class AbstractZkLedgerManager i
*/
protected abstract long getLedgerId(String ledgerPath) throws IOException;
+ @Override
+ public void process(WatchedEvent event) {
+ LOG.info("Received watched event {} from zookeeper based ledger manager.", event);
+ if (Event.EventType.None == event.getType()) {
+ /** TODO: BOOKKEEPER-537 to handle expire events.
+ if (Event.KeeperState.Expired == event.getState()) {
+ LOG.info("ZooKeeper client expired on ledger manager.");
+ Set<Long> keySet = new HashSet<Long>(listeners.keySet());
+ for (Long lid : keySet) {
+ scheduler.submit(new ReadLedgerMetadataTask(lid));
+ LOG.info("Re-read ledger metadata for {} after zookeeper session expired.",
lid);
+ }
+ }
+ **/
+ return;
+ }
+ String path = event.getPath();
+ if (null == path) {
+ return;
+ }
+ final long ledgerId;
+ try {
+ ledgerId = getLedgerId(event.getPath());
+ } catch (IOException ioe) {
+ LOG.info("Received invalid ledger path {} : ", event.getPath(), ioe);
+ return;
+ }
+ switch (event.getType()) {
+ case NodeDeleted:
+ Set<LedgerMetadataListener> listenerSet = listeners.remove(ledgerId);
+ if (null != listenerSet) {
+ LOG.debug("Removed ledger metadata listeners on ledger {} : {}",
+ ledgerId, listenerSet);
+ } else {
+ LOG.debug("No ledger metadata listeners to remove from ledger {} after it's
deleted.",
+ ledgerId);
+ }
+ break;
+ case NodeDataChanged:
+ new ReadLedgerMetadataTask(ledgerId).run();
+ break;
+ default:
+ LOG.debug("Received event {} on {}.", event.getType(), event.getPath());
+ break;
+ }
+ }
+
/**
* Removes ledger metadata from ZooKeeper if version matches.
*
@@ -120,6 +240,15 @@ abstract class AbstractZkLedgerManager i
LOG.warn("Ledger node does not exist in ZooKeeper: ledgerId={}", ledgerId);
bkRc = BKException.Code.NoSuchLedgerExistsException;
} else if (rc == KeeperException.Code.OK.intValue()) {
+ // removed listener on ledgerId
+ Set<LedgerMetadataListener> listenerSet = listeners.remove(ledgerId);
+ if (null != listenerSet) {
+ LOG.debug("Remove registered ledger metadata listeners on ledger
{} after ledger is deleted.",
+ ledgerId, listenerSet);
+ } else {
+ LOG.debug("No ledger metadata listeners to remove from ledger {}
when it's being deleted.",
+ ledgerId);
+ }
bkRc = BKException.Code.OK;
} else {
bkRc = BKException.Code.ZKException;
@@ -130,8 +259,49 @@ abstract class AbstractZkLedgerManager i
}
@Override
+ public void registerLedgerMetadataListener(long ledgerId, LedgerMetadataListener listener)
{
+ if (null != listener) {
+ LOG.info("Registered ledger metadata listener {} on ledger {}.", listener, ledgerId);
+ Set<LedgerMetadataListener> listenerSet = listeners.get(ledgerId);
+ if (listenerSet == null) {
+ Set<LedgerMetadataListener> newListenerSet = new HashSet<LedgerMetadataListener>();
+ Set<LedgerMetadataListener> oldListenerSet = listeners.putIfAbsent(ledgerId,
newListenerSet);
+ if (null != oldListenerSet) {
+ listenerSet = oldListenerSet;
+ } else {
+ listenerSet = newListenerSet;
+ }
+ }
+ synchronized (listenerSet) {
+ listenerSet.add(listener);
+ }
+ new ReadLedgerMetadataTask(ledgerId).run();
+ }
+ }
+
+ @Override
+ public void unregisterLedgerMetadataListener(long ledgerId, LedgerMetadataListener listener)
{
+ Set<LedgerMetadataListener> listenerSet = listeners.get(ledgerId);
+ if (listenerSet != null) {
+ synchronized (listenerSet) {
+ if (listenerSet.remove(listener)) {
+ LOG.info("Unregistered ledger metadata listener {} on ledger {}.", listener,
ledgerId);
+ }
+ if (listenerSet.isEmpty()) {
+ listeners.remove(ledgerId, listenerSet);
+ }
+ }
+ }
+ }
+
+ @Override
public void readLedgerMetadata(final long ledgerId, final GenericCallback<LedgerMetadata>
readCb) {
- zk.getData(getLedgerPath(ledgerId), false, new DataCallback() {
+ readLedgerMetadata(ledgerId, readCb, null);
+ }
+
+ protected void readLedgerMetadata(final long ledgerId, final GenericCallback<LedgerMetadata>
readCb,
+ Watcher watcher) {
+ zk.getData(getLedgerPath(ledgerId), watcher, new DataCallback() {
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat
stat) {
if (rc == KeeperException.Code.NONODE.intValue()) {
@@ -296,5 +466,10 @@ abstract class AbstractZkLedgerManager i
@Override
public void close() {
+ try {
+ scheduler.shutdown();
+ } catch (Exception e) {
+ LOG.warn("Error when closing zookeeper based ledger manager: ", e);
+ }
}
}
Modified: zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java?rev=1586799&r1=1586798&r2=1586799&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
(original)
+++ zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
Fri Apr 11 23:34:52 2014
@@ -1,5 +1,3 @@
-package org.apache.bookkeeper.meta;
-
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -17,6 +15,7 @@ package org.apache.bookkeeper.meta;
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.bookkeeper.meta;
import java.io.IOException;
import java.util.Collections;
@@ -72,9 +71,6 @@ class HierarchicalLedgerManager extends
// Path to generate global id
private final String idGenPath;
- // we use this to prevent long stack chains from building up in callbacks
- ScheduledExecutorService scheduler;
-
/**
* Constructor
*
@@ -87,21 +83,10 @@ class HierarchicalLedgerManager extends
super(conf, zk);
this.idGenPath = ledgerRootPath + IDGENERATION_PREFIX;
- this.scheduler = Executors.newSingleThreadScheduledExecutor();
LOG.debug("Using HierarchicalLedgerManager with root path : {}", ledgerRootPath);
}
@Override
- public void close() {
- try {
- scheduler.shutdown();
- } catch (Exception e) {
- LOG.warn("Error when closing HierarchicalLedgerManager : ", e);
- }
- super.close();
- }
-
- @Override
public void createLedger(final LedgerMetadata metadata, final GenericCallback<Long>
ledgerCb) {
ZkUtils.asyncCreateFullPathOptimistic(zk, idGenPath, new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL, new StringCallback() {
Modified: zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java?rev=1586799&r1=1586798&r2=1586799&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java
(original)
+++ zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java
Fri Apr 11 23:34:52 2014
@@ -27,6 +27,7 @@ import java.util.TreeSet;
import org.apache.zookeeper.AsyncCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerMetadata;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
import org.apache.bookkeeper.versioning.Version;
@@ -94,6 +95,26 @@ public interface LedgerManager extends C
public void writeLedgerMetadata(long ledgerId, LedgerMetadata metadata, GenericCallback<Void>
cb);
/**
+ * Register the ledger metadata <i>listener</i> on <i>ledgerId</i>.
+ *
+ * @param ledgerId
+ * ledger id.
+ * @param listener
+ * listener.
+ */
+ public abstract void registerLedgerMetadataListener(long ledgerId, LedgerMetadataListener
listener);
+
+ /**
+ * Unregister the ledger metadata <i>listener</i> on <i>ledgerId</i>.
+ *
+ * @param ledgerId
+ * ledger id.
+ * @param listener
+ * ledger metadata listener.
+ */
+ public abstract void unregisterLedgerMetadataListener(long ledgerId, LedgerMetadataListener
listener);
+
+ /**
* Loop to process all ledgers.
* <p>
* <ul>
Modified: zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java?rev=1586799&r1=1586798&r2=1586799&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
(original)
+++ zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
Fri Apr 11 23:34:52 2014
@@ -43,6 +43,7 @@ import org.apache.bookkeeper.metastore.M
import org.apache.bookkeeper.metastore.MetastoreTableItem;
import org.apache.bookkeeper.metastore.Value;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
import org.apache.bookkeeper.replication.ReplicationException;
import org.apache.bookkeeper.util.StringUtils;
@@ -205,6 +206,16 @@ public class MSLedgerManagerFactory exte
}
@Override
+ public void registerLedgerMetadataListener(long ledgerId, LedgerMetadataListener
listener) {
+ // TODO BOOKKEEPER-747: should provide ledger metadata listener in metadata store.
+ }
+
+ @Override
+ public void unregisterLedgerMetadataListener(long ledgerId, LedgerMetadataListener
listener) {
+ // TODO BOOKKEEPER-747: should provide ledger metadata listener in metadata store.
+ }
+
+ @Override
public void close() {
try {
scheduler.shutdown();
Modified: zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java?rev=1586799&r1=1586798&r2=1586799&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
(original)
+++ zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
Fri Apr 11 23:34:52 2014
@@ -25,6 +25,7 @@ import java.net.InetSocketAddress;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.client.LedgerMetadata;
import org.apache.zookeeper.AsyncCallback;
import org.jboss.netty.buffer.ChannelBuffer;
import org.slf4j.Logger;
@@ -45,6 +46,21 @@ public class BookkeeperInternalCallbacks
*
*/
+ /**
+ * Listener on ledger metadata changes.
+ */
+ public interface LedgerMetadataListener {
+ /**
+ * Triggered each time ledger metadata changed.
+ *
+ * @param ledgerId
+ * ledger id.
+ * @param metadata
+ * new ledger metadata.
+ */
+ void onChanged(long ledgerId, LedgerMetadata metadata);
+ }
+
public interface WriteCallback {
void writeComplete(int rc, long ledgerId, long entryId, InetSocketAddress addr, Object
ctx);
}
Modified: zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/BookieLedgerIndexer.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/BookieLedgerIndexer.java?rev=1586799&r1=1586798&r2=1586799&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/BookieLedgerIndexer.java
(original)
+++ zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/BookieLedgerIndexer.java
Fri Apr 11 23:34:52 2014
@@ -54,7 +54,7 @@ public class BookieLedgerIndexer {
/**
* Generating bookie vs its ledgers map by reading all the ledgers in each
* bookie and parsing its metadata.
- *
+ *
* @return bookie2ledgersMap map of bookie vs ledgers
* @throws BKAuditException
* exception while getting bookie-ledgers
Modified: zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java?rev=1586799&r1=1586798&r2=1586799&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
(original)
+++ zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
Fri Apr 11 23:34:52 2014
@@ -1,5 +1,3 @@
-package org.apache.bookkeeper.bookie;
-
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -20,6 +18,8 @@ package org.apache.bookkeeper.bookie;
* under the License.
*
*/
+package org.apache.bookkeeper.bookie;
+
import java.io.File;
import java.io.IOException;
import java.io.IOException;
@@ -41,6 +41,7 @@ import org.apache.bookkeeper.util.TestUt
import org.apache.zookeeper.AsyncCallback;
import org.apache.bookkeeper.client.LedgerMetadata;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
import org.apache.bookkeeper.versioning.Version;
@@ -344,6 +345,16 @@ public class CompactionTest extends Book
unsupported();
}
@Override
+ public void registerLedgerMetadataListener(long ledgerId,
+ LedgerMetadataListener listener) {
+ unsupported();
+ }
+ @Override
+ public void unregisterLedgerMetadataListener(long ledgerId,
+ LedgerMetadataListener listener) {
+ unsupported();
+ }
+ @Override
public void close() throws IOException {}
void unsupported() {
Added: zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java?rev=1586799&view=auto
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
(added)
+++ zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
Fri Apr 11 23:34:52 2014
@@ -0,0 +1,71 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
+
+public class TestWatchEnsembleChange extends BookKeeperClusterTestCase {
+
+ static Logger LOG = LoggerFactory.getLogger(TestWatchEnsembleChange.class);
+
+ final DigestType digestType;
+
+ public TestWatchEnsembleChange() {
+ super(7);
+ this.digestType = DigestType.CRC32;
+ }
+
+ @Test(timeout = 60000)
+ public void testWatchEnsembleChange() throws Exception {
+ int numEntries = 10;
+ LedgerHandle lh = bkc.createLedger(3, 3, 3, digestType, "".getBytes());
+ for (int i=0; i<numEntries; i++) {
+ lh.addEntry(("data" + i).getBytes());
+ LOG.info("Added entry {}.", i);
+ }
+ LedgerHandle readLh = bkc.openLedgerNoRecovery(lh.getId(), digestType, "".getBytes());
+ long lastLAC = readLh.getLastAddConfirmed();
+ assertEquals(numEntries - 2, lastLAC);
+ ArrayList<InetSocketAddress> ensemble =
+ lh.getLedgerMetadata().currentEnsemble;
+ for (InetSocketAddress addr : ensemble) {
+ killBookie(addr);
+ }
+ // write another batch of entries, which will trigger ensemble change
+ for (int i=0; i<numEntries; i++) {
+ lh.addEntry(("data" + (numEntries + i)).getBytes());
+ LOG.info("Added entry {}.", (numEntries + i));
+ }
+ TimeUnit.SECONDS.sleep(5);
+ readLh.readLastConfirmed();
+ assertEquals(2 * numEntries - 2, readLh.getLastAddConfirmed());
+ readLh.close();
+ lh.close();
+ }
+}
|