zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From iv...@apache.org
Subject svn commit: r1570647 [1/2] - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ bookk...
Date Fri, 21 Feb 2014 18:01:17 GMT
Author: ivank
Date: Fri Feb 21 18:01:17 2014
New Revision: 1570647

URL: http://svn.apache.org/r1570647
Log:
BOOKKEEPER-654: Bookkeeper client operations are allowed even after its closure, bk#close() (sijie via ivank)

Added:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/CleanupLedgerManager.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1570647&r1=1570646&r2=1570647&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Fri Feb 21 18:01:17 2014
@@ -166,6 +166,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-724: Shade introduces RAT error (sijie via fpj)
 
+        BOOKKEEPER-654: Bookkeeper client operations are allowed even after its closure, bk#close() (sijie via ivank)
+
       hedwig-server:
 
         BOOKKEEPER-601: readahead cache size isn't updated correctly (sijie via fpj)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java?rev=1570647&r1=1570646&r2=1570647&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java Fri Feb 21 18:01:17 2014
@@ -90,6 +90,8 @@ public abstract class BKException extend
             return new BKWriteOnReadOnlyBookieException();
         case Code.ReplicationException:
             return new BKReplicationException();
+        case Code.ClientClosedException:
+            return new BKClientClosedException();
         case Code.IllegalOpException:
             return new BKIllegalOpException();
         default:
@@ -121,6 +123,7 @@ public abstract class BKException extend
         int ProtocolVersionException = -16;
         int MetadataVersionException = -17;
         int MetaStoreException = -18;
+        int ClientClosedException = -19;
 
         int IllegalOpException = -100;
         int LedgerFencedException = -101;
@@ -193,6 +196,8 @@ public abstract class BKException extend
             return "Attempting to write on ReadOnly bookie";
         case Code.ReplicationException:
             return "Errors in replication pipeline";
+        case Code.ClientClosedException:
+            return "BookKeeper client is closed";
         case Code.IllegalOpException:
             return "Invalid operation";
         default:
@@ -349,4 +354,11 @@ public abstract class BKException extend
             super(Code.ReplicationException);
         }
     }
+
+    public static class BKClientClosedException extends BKException {
+        public BKClientClosedException() {
+            super(Code.ClientClosedException);
+        }
+    }
+
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java?rev=1570647&r1=1570646&r2=1570647&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java Fri Feb 21 18:01:17 2014
@@ -25,13 +25,14 @@ import java.util.concurrent.CountDownLat
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
 import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
 import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
 import org.apache.bookkeeper.client.AsyncCallback.IsClosedCallback;
-import org.apache.bookkeeper.client.BKException.Code;
 import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.meta.CleanupLedgerManager;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.proto.BookieClient;
@@ -71,8 +72,6 @@ public class BookKeeper {
     static final Logger LOG = LoggerFactory.getLogger(BookKeeper.class);
 
     final ZooKeeper zk;
-    final CountDownLatch connectLatch = new CountDownLatch(1);
-    final static int zkConnectTimeoutMs = 5000;
     final ClientSocketChannelFactory channelFactory;
 
     // The stats logger for this client.
@@ -105,10 +104,9 @@ public class BookKeeper {
 
     final ClientConfiguration conf;
 
-    interface ZKConnectCallback {
-        public void connected();
-        public void connectionFailed(int code);
-    }
+    // Close State
+    boolean closed = false;
+    final ReentrantReadWriteLock closeLock = new ReentrantReadWriteLock();
 
     public static class Builder {
         final ClientConfiguration conf;
@@ -187,7 +185,7 @@ public class BookKeeper {
 
     /**
      * Create a bookkeeper client using a configuration object.
-     * A zookeeper client and a client socket factory will be 
+     * A zookeeper client and a client socket factory will be
      * instantiated as part of this constructor.
      *
      * @param conf
@@ -222,7 +220,7 @@ public class BookKeeper {
         bookieWatcher.readBookiesBlocking();
 
         ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, zk);
-        ledgerManager = ledgerManagerFactory.newLedgerManager();
+        ledgerManager = new CleanupLedgerManager(ledgerManagerFactory.newLedgerManager());
 
         ownChannelFactory = true;
         ownZKHandle = true;
@@ -308,7 +306,7 @@ public class BookKeeper {
         bookieWatcher.readBookiesBlocking();
 
         ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, zk);
-        ledgerManager = ledgerManagerFactory.newLedgerManager();
+        ledgerManager = new CleanupLedgerManager(ledgerManagerFactory.newLedgerManager());
     }
 
     private EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfiguration conf)
@@ -321,6 +319,18 @@ public class BookKeeper {
         }
     }
 
+    int getReturnRc(int rc) {
+        if (BKException.Code.OK == rc) {
+            return rc;
+        } else {
+            if (bookieClient.isClosed()) {
+                return BKException.Code.ClientClosedException;
+            } else {
+                return rc;
+            }
+        }
+    }
+
     LedgerManager getLedgerManager() {
         return ledgerManager;
     }
@@ -334,7 +344,7 @@ public class BookKeeper {
      */
     public enum DigestType {
         MAC, CRC32
-    };
+    }
 
     ZooKeeper getZkHandle() {
         return zk;
@@ -427,9 +437,18 @@ public class BookKeeper {
         if (writeQuorumSize < ackQuorumSize) {
             throw new IllegalArgumentException("Write quorum must be larger than ack quorum");
         }
-        new LedgerCreateOp(BookKeeper.this, ensSize, writeQuorumSize,
-                           ackQuorumSize, digestType, passwd, cb, ctx)
-            .initiate();
+        closeLock.readLock().lock();
+        try {
+            if (closed) {
+                cb.createComplete(BKException.Code.ClientClosedException, null, ctx);
+                return;
+            }
+            new LedgerCreateOp(BookKeeper.this, ensSize, writeQuorumSize,
+                               ackQuorumSize, digestType, passwd, cb, ctx)
+                .initiate();
+        } finally {
+            closeLock.readLock().unlock();
+        }
     }
 
 
@@ -451,7 +470,7 @@ public class BookKeeper {
 
     /**
      * Synchronous call to create ledger. Parameters match those of
-     * {@link #asyncCreateLedger(int, int, DigestType, byte[], 
+     * {@link #asyncCreateLedger(int, int, DigestType, byte[],
      *                           AsyncCallback.CreateCallback, Object)}
      *
      * @param ensSize
@@ -510,19 +529,19 @@ public class BookKeeper {
 
     /**
      * Open existing ledger asynchronously for reading.
-     * 
-     * Opening a ledger with this method invokes fencing and recovery on the ledger 
-     * if the ledger has not been closed. Fencing will block all other clients from 
-     * writing to the ledger. Recovery will make sure that the ledger is closed 
-     * before reading from it. 
      *
-     * Recovery also makes sure that any entries which reached one bookie, but not a 
+     * Opening a ledger with this method invokes fencing and recovery on the ledger
+     * if the ledger has not been closed. Fencing will block all other clients from
+     * writing to the ledger. Recovery will make sure that the ledger is closed
+     * before reading from it.
+     *
+     * Recovery also makes sure that any entries which reached one bookie, but not a
      * quorum, will be replicated to a quorum of bookies. This occurs in cases were
      * the writer of a ledger crashes after sending a write request to one bookie but
-     * before being able to send it to the rest of the bookies in the quorum. 
+     * before being able to send it to the rest of the bookies in the quorum.
      *
      * If the ledger is already closed, neither fencing nor recovery will be applied.
-     * 
+     *
      * @see LedgerHandle#asyncClose
      *
      * @param lId
@@ -536,7 +555,16 @@ public class BookKeeper {
      */
     public void asyncOpenLedger(final long lId, final DigestType digestType, final byte passwd[],
                                 final OpenCallback cb, final Object ctx) {
-        new LedgerOpenOp(BookKeeper.this, lId, digestType, passwd, cb, ctx).initiate();
+        closeLock.readLock().lock();
+        try {
+            if (closed) {
+                cb.openComplete(BKException.Code.ClientClosedException, null, ctx);
+                return;
+            }
+            new LedgerOpenOp(BookKeeper.this, lId, digestType, passwd, cb, ctx).initiate();
+        } finally {
+            closeLock.readLock().unlock();
+        }
     }
 
     /**
@@ -546,14 +574,14 @@ public class BookKeeper {
      * unsealed forever if there is no external mechanism to detect the failure
      * of the writer and the ledger is not open in a safe manner, invoking the
      * recovery procedure.
-     * 
+     *
      * Opening a ledger without recovery does not fence the ledger. As such, other
-     * clients can continue to write to the ledger. 
+     * clients can continue to write to the ledger.
      *
-     * This method returns a read only ledger handle. It will not be possible 
-     * to add entries to the ledger. Any attempt to add entries will throw an 
+     * This method returns a read only ledger handle. It will not be possible
+     * to add entries to the ledger. Any attempt to add entries will throw an
      * exception.
-     * 
+     *
      * Reads from the returned ledger will only be able to read entries up until
      * the lastConfirmedEntry at the point in time at which the ledger was opened.
      *
@@ -568,7 +596,16 @@ public class BookKeeper {
      */
     public void asyncOpenLedgerNoRecovery(final long lId, final DigestType digestType, final byte passwd[],
                                           final OpenCallback cb, final Object ctx) {
-        new LedgerOpenOp(BookKeeper.this, lId, digestType, passwd, cb, ctx).initiateWithoutRecovery();
+        closeLock.readLock().lock();
+        try {
+            if (closed) {
+                cb.openComplete(BKException.Code.ClientClosedException, null, ctx);
+                return;
+            }
+            new LedgerOpenOp(BookKeeper.this, lId, digestType, passwd, cb, ctx).initiateWithoutRecovery();
+        } finally {
+            closeLock.readLock().unlock();
+        }
     }
 
 
@@ -654,7 +691,16 @@ public class BookKeeper {
      *            optional control object
      */
     public void asyncDeleteLedger(final long lId, final DeleteCallback cb, final Object ctx) {
-        new LedgerDeleteOp(BookKeeper.this, lId, cb, ctx).initiate();
+        closeLock.readLock().lock();
+        try {
+            if (closed) {
+                cb.deleteComplete(BKException.Code.ClientClosedException, ctx);
+                return;
+            }
+            new LedgerDeleteOp(BookKeeper.this, lId, cb, ctx).initiate();
+        } finally {
+            closeLock.readLock().unlock();
+        }
     }
 
 
@@ -680,11 +726,11 @@ public class BookKeeper {
             throw BKException.create(counter.getrc());
         }
     }
-    
+
     /**
      * Check asynchronously whether the ledger with identifier <i>lId</i>
      * has been closed.
-     * 
+     *
      * @param lId   ledger identifier
      * @param cb    callback method
      */
@@ -699,11 +745,11 @@ public class BookKeeper {
             }
         });
     }
-    
+
     /**
      * Check whether the ledger with identifier <i>lId</i>
      * has been closed.
-     * 
+     *
      * @param lId
      * @return boolean true if ledger has been closed
      * @throws BKException
@@ -730,16 +776,16 @@ public class BookKeeper {
          * Call asynchronous version of isClosed
          */
         asyncIsClosed(lId, cb, null);
-        
+
         /*
          * Wait for callback
          */
         result.notifier.await();
-        
+
         if (result.rc != BKException.Code.OK) {
             throw BKException.create(result.rc);
         }
-        
+
         return result.isClosed;
     }
 
@@ -748,23 +794,38 @@ public class BookKeeper {
      *
      */
     public void close() throws InterruptedException, BKException {
-        scheduler.shutdown();
-        if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
-            LOG.warn("The scheduler did not shutdown cleanly");
-        }
-        mainWorkerPool.shutdown();
-        if (!mainWorkerPool.awaitTermination(10, TimeUnit.SECONDS)) {
-            LOG.warn("The mainWorkerPool did not shutdown cleanly");
+        closeLock.writeLock().lock();
+        try {
+            if (closed) {
+                return;
+            }
+            closed = true;
+        } finally {
+            closeLock.writeLock().unlock();
         }
 
+        // Close bookie client so all pending bookie requests would be failed
+        // which will reject any incoming bookie requests.
         bookieClient.close();
         try {
+            // Close ledger manage so all pending metadata requests would be failed
+            // which will reject any incoming metadata requests.
             ledgerManager.close();
             ledgerManagerFactory.uninitialize();
         } catch (IOException ie) {
             LOG.error("Failed to close ledger manager : ", ie);
         }
 
+        // Close the scheduler
+        scheduler.shutdown();
+        if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
+            LOG.warn("The scheduler did not shutdown cleanly");
+        }
+        mainWorkerPool.shutdown();
+        if (!mainWorkerPool.awaitTermination(10, TimeUnit.SECONDS)) {
+            LOG.warn("The mainWorkerPool did not shutdown cleanly");
+        }
+
         if (ownChannelFactory) {
             channelFactory.releaseExternalResources();
         }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java?rev=1570647&r1=1570646&r2=1570647&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java Fri Feb 21 18:01:17 2014
@@ -468,7 +468,7 @@ public class BookKeeperAdmin {
 
             @Override
             public void processResult(int rc, String path, Object ctx) {
-                cb.recoverComplete(rc, ctx);
+                cb.recoverComplete(bkc.getReturnRc(rc), ctx);
             }
         }
 
@@ -708,7 +708,7 @@ public class BookKeeperAdmin {
         asyncRecoverLedgerFragment(lh, ledgerFragment, cb, targetBookieAddress);
         syncCounter.block(0);
         if (syncCounter.getrc() != BKException.Code.OK) {
-            throw BKException.create(syncCounter.getrc());
+            throw BKException.create(bkc.getReturnRc(syncCounter.getrc()));
         }
     }
 

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java?rev=1570647&r1=1570646&r2=1570647&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java Fri Feb 21 18:01:17 2014
@@ -24,6 +24,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -143,7 +144,11 @@ class BookieWatcher implements Watcher, 
         if (rc != KeeperException.Code.OK.intValue()) {
             //logger.error("Error while reading bookies", KeeperException.create(Code.get(rc), path));
             // try the read after a second again
-            scheduler.schedule(reReadTask, ZK_CONNECT_BACKOFF_SEC, TimeUnit.SECONDS);
+            try {
+                scheduler.schedule(reReadTask, ZK_CONNECT_BACKOFF_SEC, TimeUnit.SECONDS);
+            } catch (RejectedExecutionException ree) {
+                logger.warn("Failed to schedule reading bookies task : ", ree);
+            }
             return;
         }
 

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java?rev=1570647&r1=1570646&r2=1570647&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java Fri Feb 21 18:01:17 2014
@@ -156,7 +156,10 @@ public class LedgerChecker {
         }
 
         public void operationComplete(int rc, LedgerFragment result) {
-            if (rc != BKException.Code.OK) {
+            if (rc == BKException.Code.ClientClosedException) {
+                cb.operationComplete(BKException.Code.ClientClosedException, badFragments);
+                return;
+            } else if (rc != BKException.Code.OK) {
                 badFragments.add(result);
             }
             if (numFragments.decrementAndGet() == 0) {

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java?rev=1570647&r1=1570646&r2=1570647&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java Fri Feb 21 18:01:17 2014
@@ -247,7 +247,7 @@ public class LedgerFragmentReplicator {
             @Override
             public void readComplete(int rc, LedgerHandle lh,
                     Enumeration<LedgerEntry> seq, Object ctx) {
-                if (rc != Code.OK.intValue()) {
+                if (rc != BKException.Code.OK) {
                     LOG.error("BK error reading ledger entry: " + entryId,
                             BKException.create(rc));
                     ledgerFragmentEntryMcb.processResult(rc, null, null);
@@ -270,7 +270,7 @@ public class LedgerFragmentReplicator {
                             public void writeComplete(int rc, long ledgerId,
                                     long entryId, BookieSocketAddress addr,
                                     Object ctx) {
-                                if (rc != Code.OK.intValue()) {
+                                if (rc != BKException.Code.OK) {
                                     LOG.error(
                                             "BK error writing entry for ledgerId: "
                                                     + ledgerId + ", entryId: "
@@ -322,7 +322,7 @@ public class LedgerFragmentReplicator {
 
         @Override
         public void processResult(int rc, String path, Object ctx) {
-            if (rc != Code.OK.intValue()) {
+            if (rc != BKException.Code.OK) {
                 LOG.error("BK error replicating ledger fragments for ledger: "
                         + lh.getId(), BKException.create(rc));
                 ledgerFragmentsMcb.processResult(rc, null, null);

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=1570647&r1=1570646&r2=1570647&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java Fri Feb 21 18:01:17 2014
@@ -28,6 +28,7 @@ import java.util.Arrays;
 import java.util.Enumeration;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
@@ -239,12 +240,23 @@ public class LedgerHandle {
      *          callback implementation
      * @param ctx
      *          control object
-     * @throws InterruptedException
      */
     public void asyncClose(CloseCallback cb, Object ctx) {
         asyncCloseInternal(cb, ctx, BKException.Code.LedgerClosedException);
     }
 
+    void asyncCloseInternal(final CloseCallback cb, final Object ctx, final int rc) {
+        try {
+            doAsyncCloseInternal(cb, ctx, rc);
+        } catch (RejectedExecutionException re) {
+            LOG.debug("Failed to close ledger {} : ", ledgerId, re);
+            synchronized (this) {
+                errorOutPendingAdds(bk.getReturnRc(rc));
+            }
+            cb.closeComplete(bk.getReturnRc(BKException.Code.InterruptedException), this, ctx);
+        }
+    }
+
     /**
      * Same as public version of asyncClose except that this one takes an
      * additional parameter which is the return code to hand to all the pending
@@ -254,7 +266,7 @@ public class LedgerHandle {
      * @param ctx
      * @param rc
      */
-    void asyncCloseInternal(final CloseCallback cb, final Object ctx, final int rc) {
+    void doAsyncCloseInternal(final CloseCallback cb, final Object ctx, final int rc) {
         bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() {
             @Override
             public void safeRun() {
@@ -276,9 +288,9 @@ public class LedgerHandle {
                     // error out pending adds first
                     errorOutPendingAdds(rc);
 
-                    // synchronized on LedgerHandle.this to ensure that 
-                    // lastAddPushed can not be updated after the metadata 
-                    // is closed. 
+                    // synchronized on LedgerHandle.this to ensure that
+                    // lastAddPushed can not be updated after the metadata
+                    // is closed.
                     metadata.setLength(length);
                     metadata.close(lastAddConfirmed);
                     lastAddPushed = lastAddConfirmed;
@@ -529,8 +541,8 @@ public class LedgerHandle {
                     op.initiate(toSend, length);
                 }
             });
-        } catch (RuntimeException e) {
-            cb.addComplete(BKException.Code.InterruptedException,
+        } catch (RejectedExecutionException e) {
+            cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException),
                     LedgerHandle.this, INVALID_ENTRY_ID, ctx);
         }
     }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java?rev=1570647&r1=1570646&r2=1570647&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java Fri Feb 21 18:01:17 2014
@@ -173,7 +173,7 @@ class LedgerOpenOp implements GenericCal
                     } else if (rc == BKException.Code.UnauthorizedAccessException) {
                         openComplete(BKException.Code.UnauthorizedAccessException, null);
                     } else {
-                        openComplete(BKException.Code.LedgerRecoveryException, null);
+                        openComplete(bk.getReturnRc(BKException.Code.LedgerRecoveryException), null);
                     }
                 }
             });
@@ -183,7 +183,7 @@ class LedgerOpenOp implements GenericCal
                 public void readLastConfirmedComplete(int rc,
                         long lastConfirmed, Object ctx) {
                     if (rc != BKException.Code.OK) {
-                        openComplete(BKException.Code.ReadException, null);
+                        openComplete(bk.getReturnRc(BKException.Code.ReadException), null);
                     } else {
                         lh.lastAddConfirmed = lh.lastAddPushed = lastConfirmed;
                         openComplete(BKException.Code.OK, lh);

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java?rev=1570647&r1=1570646&r2=1570647&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java Fri Feb 21 18:01:17 2014
@@ -152,6 +152,10 @@ class PendingAddOp implements WriteCallb
         case BKException.Code.OK:
             // continue
             break;
+        case BKException.Code.ClientClosedException:
+            // bookie client is closed.
+            lh.errorOutPendingAdds(rc);
+            return;
         case BKException.Code.LedgerFencedException:
             LOG.warn("Fencing exception on write: L{} E{} on {}",
                      new Object[] { ledgerId, entryId, addr });

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java?rev=1570647&r1=1570646&r2=1570647&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java Fri Feb 21 18:01:17 2014
@@ -29,6 +29,7 @@ import java.util.NoSuchElementException;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -284,28 +285,34 @@ class PendingReadOp implements Enumerati
         ArrayList<BookieSocketAddress> ensemble = null;
 
         if (speculativeReadTimeout > 0) {
-            speculativeTask = scheduler.scheduleWithFixedDelay(new Runnable() {
-                    @Override
-                    public void run() {
-                        int x = 0;
-                        for (LedgerEntryRequest r : seq) {
-                            if (!r.isComplete()) {
-                                if (null == r.maybeSendSpeculativeRead(heardFromHosts)) {
-                                    // Subsequent speculative read will not materialize anyway
-                                    cancelSpeculativeTask(false);
-                                } else {
-                                    LOG.debug("Send speculative read for {}. Hosts heard are {}.",
-                                              r, heardFromHosts);
-                                    ++x;
-                                }
+            Runnable readTask = new Runnable() {
+                public void run() {
+                    int x = 0;
+                    for (LedgerEntryRequest r : seq) {
+                        if (!r.isComplete()) {
+                            if (null == r.maybeSendSpeculativeRead(heardFromHosts)) {
+                                // Subsequent speculative read will not materialize anyway
+                                cancelSpeculativeTask(false);
+                            } else {
+                                LOG.debug("Send speculative read for {}. Hosts heard are {}.",
+                                          r, heardFromHosts);
+                                ++x;
                             }
                         }
-                        if (x > 0) {
-                            LOG.debug("Send {} speculative reads for ledger {} ({}, {}). Hosts heard are {}.",
-                                      new Object[] { x, lh.getId(), startEntryId, endEntryId, heardFromHosts });
-                        }
                     }
-                }, speculativeReadTimeout, speculativeReadTimeout, TimeUnit.MILLISECONDS);
+                    if (x > 0) {
+                        LOG.debug("Send {} speculative reads for ledger {} ({}, {}). Hosts heard are {}.",
+                                  new Object[] { x, lh.getId(), startEntryId, endEntryId, heardFromHosts });
+                    }
+                }
+            };
+            try {
+                speculativeTask = scheduler.scheduleWithFixedDelay(readTask,
+                        speculativeReadTimeout, speculativeReadTimeout, TimeUnit.MILLISECONDS);
+            } catch (RejectedExecutionException re) {
+                LOG.debug("Failed to schedule speculative reads for ledger {} ({}, {}) : ",
+                    new Object[] { lh.getId(), startEntryId, endEntryId, re });
+            }
         }
 
         do {

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java?rev=1570647&r1=1570646&r2=1570647&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java Fri Feb 21 18:01:17 2014
@@ -35,6 +35,7 @@ class ReadLastConfirmedOp implements Rea
     int numResponsesPending;
     RecoveryData maxRecoveredData;
     volatile boolean completed = false;
+    int lastSeenError = BKException.Code.ReadException;
 
     LastConfirmedDataCallback cb;
     final DistributionSchedule.QuorumCoverageSet coverageSet;
@@ -104,6 +105,11 @@ class ReadLastConfirmedOp implements Rea
             cb.readLastConfirmedDataComplete(rc, maxRecoveredData);
             completed = true;
         }
+
+        if (!heardValidResponse && BKException.Code.OK != rc) {
+            lastSeenError = rc;
+        }
+
         // other return codes dont count as valid responses
         if (heardValidResponse
             && coverageSet.addBookieAndCheckCovered(bookieIndex)
@@ -119,7 +125,7 @@ class ReadLastConfirmedOp implements Rea
         if (numResponsesPending == 0 && !completed) {
             // Have got all responses back but was still not enough, just fail the operation
             LOG.error("While readLastConfirmed ledger: " + ledgerId + " did not hear success responses from all quorums");
-            cb.readLastConfirmedDataComplete(BKException.Code.LedgerRecoveryException, maxRecoveredData);
+            cb.readLastConfirmedDataComplete(lastSeenError, maxRecoveredData);
         }
 
     }

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/CleanupLedgerManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/CleanupLedgerManager.java?rev=1570647&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/CleanupLedgerManager.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/CleanupLedgerManager.java Fri Feb 21 18:01:17 2014
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.meta;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.LedgerMetadata;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.zookeeper.AsyncCallback;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class CleanupLedgerManager implements LedgerManager {
+
+    private class CleanupGenericCallback<T> implements GenericCallback<T> {
+
+        private final GenericCallback<T> cb;
+
+        CleanupGenericCallback(GenericCallback<T> cb) {
+            this.cb = cb;
+            addCallback(cb);
+        }
+
+        @Override
+        public void operationComplete(int rc, T result) {
+            closeLock.readLock().lock();
+            try {
+                if (!closed && null != removeCallback(cb)) {
+                    cb.operationComplete(rc, result);
+                }
+            } finally {
+                closeLock.readLock().unlock();
+            }
+        }
+    }
+
+    private static class ClosedLedgerRangeIterator implements LedgerRangeIterator {
+
+        @Override
+        public boolean hasNext() throws IOException {
+            throw new IOException("Ledger manager is closed.");
+        }
+
+        @Override
+        public LedgerRange next() throws IOException {
+            throw new IOException("Ledger manager is closed.");
+        }
+    }
+
+    private final LedgerManager underlying;
+    private final ConcurrentMap<GenericCallback, GenericCallback> callbacks =
+        new ConcurrentHashMap<GenericCallback, GenericCallback>();
+    private boolean closed = false;
+    private final ReentrantReadWriteLock closeLock = new ReentrantReadWriteLock();
+
+    public CleanupLedgerManager(LedgerManager lm) {
+        this.underlying = lm;
+    }
+
+    private void addCallback(GenericCallback callback) {
+        callbacks.put(callback, callback);
+    }
+
+    @Override
+    public void registerLedgerMetadataListener(long ledgerId, LedgerMetadataListener listener) {
+        underlying.registerLedgerMetadataListener(ledgerId, listener);
+    }
+
+    @Override
+    public void unregisterLedgerMetadataListener(long ledgerId, LedgerMetadataListener listener) {
+        underlying.unregisterLedgerMetadataListener(ledgerId, listener);
+    }
+
+    private GenericCallback removeCallback(GenericCallback callback) {
+        return callbacks.remove(callback);
+    }
+
+    @Override
+    public void createLedger(LedgerMetadata metadata,
+                             GenericCallback<Long> cb) {
+        closeLock.readLock().lock();
+        try {
+            if (closed) {
+                cb.operationComplete(BKException.Code.ClientClosedException, null);
+                return;
+            }
+            underlying.createLedger(metadata, new CleanupGenericCallback<Long>(cb));
+        } finally {
+            closeLock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public void removeLedgerMetadata(long ledgerId, Version version,
+                                     GenericCallback<Void> vb) {
+        closeLock.readLock().lock();
+        try {
+            if (closed) {
+                vb.operationComplete(BKException.Code.ClientClosedException, null);
+                return;
+            }
+            underlying.removeLedgerMetadata(ledgerId, version,
+                    new CleanupGenericCallback<Void>(vb));
+        } finally {
+            closeLock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public void readLedgerMetadata(long ledgerId,
+                                   GenericCallback<LedgerMetadata> readCb) {
+        closeLock.readLock().lock();
+        try {
+            if (closed) {
+                readCb.operationComplete(BKException.Code.ClientClosedException, null);
+                return;
+            }
+            underlying.readLedgerMetadata(ledgerId, new CleanupGenericCallback<LedgerMetadata>(readCb));
+        } finally {
+            closeLock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public void writeLedgerMetadata(long ledgerId, LedgerMetadata metadata,
+                                    GenericCallback<Void> cb) {
+        closeLock.readLock().lock();
+        try {
+            if (closed) {
+                cb.operationComplete(BKException.Code.ClientClosedException, null);
+                return;
+            }
+            underlying.writeLedgerMetadata(ledgerId, metadata,
+                    new CleanupGenericCallback<Void>(cb));
+        } finally {
+            closeLock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public void asyncProcessLedgers(Processor<Long> processor,
+                                    final AsyncCallback.VoidCallback finalCb, final Object context,
+                                    final int successRc, final int failureRc) {
+        closeLock.readLock().lock();
+        try {
+            if (closed) {
+                finalCb.processResult(failureRc, null, context);
+                return;
+            }
+            final GenericCallback<Void> stub = new GenericCallback<Void>() {
+                @Override
+                public void operationComplete(int rc, Void result) {
+                    finalCb.processResult(failureRc, null, context);
+                }
+            };
+            addCallback(stub);
+            underlying.asyncProcessLedgers(processor, new AsyncCallback.VoidCallback() {
+                @Override
+                public void processResult(int rc, String path, Object ctx) {
+                    if (null != removeCallback(stub)) {
+                        finalCb.processResult(rc, path, ctx);
+                    }
+                }
+            }, context, successRc, failureRc);
+        } finally {
+            closeLock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public LedgerRangeIterator getLedgerRanges() {
+        closeLock.readLock().lock();
+        try {
+            if (closed) {
+                return new ClosedLedgerRangeIterator();
+            }
+            return underlying.getLedgerRanges();
+        } finally {
+            closeLock.readLock().unlock();
+        }
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void close() throws IOException {
+        Set<GenericCallback> keys;
+        closeLock.writeLock().lock();
+        try {
+            if (closed) {
+                return;
+            }
+            closed = true;
+            keys = new HashSet<GenericCallback>(callbacks.keySet());
+        } finally {
+            closeLock.writeLock().unlock();
+        }
+        for (GenericCallback key : keys) {
+            GenericCallback callback = callbacks.remove(key);
+            if (null != callback) {
+                callback.operationComplete(BKException.Code.ClientClosedException, null);
+            }
+        }
+    }
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java?rev=1570647&r1=1570646&r2=1570647&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java Fri Feb 21 18:01:17 2014
@@ -27,6 +27,7 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -86,7 +87,19 @@ public class BookieClient {
         this.statsLogger = statsLogger;
     }
 
-    public PerChannelBookieClient lookupClient(BookieSocketAddress addr) {
+    private int getRc(int rc) {
+        if (BKException.Code.OK == rc) {
+            return rc;
+        } else {
+            if (closed) {
+                return BKException.Code.ClientClosedException;
+            } else {
+                return rc;
+            }
+        }
+    }
+
+    private PerChannelBookieClient lookupClient(BookieSocketAddress addr) {
         PerChannelBookieClient channel = channels.get(addr);
 
         if (channel == null) {
@@ -110,52 +123,67 @@ public class BookieClient {
     }
 
     public void closeClients(Set<BookieSocketAddress> addrs) {
-        final HashSet<PerChannelBookieClient> clients = new HashSet<PerChannelBookieClient>();
-        for (BookieSocketAddress a : addrs) {
-            PerChannelBookieClient c = channels.get(a);
-            if (c != null) {
-                clients.add(c);
+        closeLock.readLock().lock();
+        try {
+            final HashSet<PerChannelBookieClient> clients = new HashSet<PerChannelBookieClient>();
+            for (BookieSocketAddress a : addrs) {
+                PerChannelBookieClient c = channels.get(a);
+                if (c != null) {
+                    clients.add(c);
+                }
             }
-        }
 
-        if (clients.size() == 0) {
-            return;
-        }
-        executor.submit(new SafeRunnable() {
-                @Override
-                public void safeRun() {
-                    for (PerChannelBookieClient c : clients) {
-                        c.disconnect();
+            if (clients.size() == 0) {
+                return;
+            }
+            executor.submit(new SafeRunnable() {
+                    @Override
+                    public void safeRun() {
+                        for (PerChannelBookieClient c : clients) {
+                            c.disconnect();
+                        }
                     }
-                }
-            });
+                });
+        } finally {
+            closeLock.readLock().unlock();
+        }
     }
 
     public void addEntry(final BookieSocketAddress addr, final long ledgerId, final byte[] masterKey,
             final long entryId,
             final ChannelBuffer toSend, final WriteCallback cb, final Object ctx, final int options) {
-        final PerChannelBookieClient client = lookupClient(addr);
-        if (client == null) {
-            cb.writeComplete(BKException.Code.BookieHandleNotAvailableException,
-                             ledgerId, entryId, addr, ctx);
-            return;
-        }
+        closeLock.readLock().lock();
+        try {
+            final PerChannelBookieClient client = lookupClient(addr);
+            if (client == null) {
+                cb.writeComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
+                                 ledgerId, entryId, addr, ctx);
+                return;
+            }
 
-        client.connectIfNeededAndDoOp(new GenericCallback<Void>() {
-            @Override
-            public void operationComplete(final int rc, Void result) {
-                if (rc != BKException.Code.OK) {
-                    executor.submitOrdered(ledgerId, new SafeRunnable() {
-                        @Override
-                        public void safeRun() {
-                            cb.writeComplete(rc, ledgerId, entryId, addr, ctx);
+            client.connectIfNeededAndDoOp(new GenericCallback<Void>() {
+                @Override
+                public void operationComplete(final int rc, Void result) {
+                    if (rc != BKException.Code.OK) {
+                        try {
+                            executor.submitOrdered(ledgerId, new SafeRunnable() {
+                                @Override
+                                public void safeRun() {
+                                    cb.writeComplete(rc, ledgerId, entryId, addr, ctx);
+                                }
+                            });
+                        } catch (RejectedExecutionException re) {
+                            cb.writeComplete(getRc(BKException.Code.InterruptedException),
+                                    ledgerId, entryId, addr, ctx);
                         }
-                    });
-                    return;
+                        return;
+                    }
+                    client.addEntry(ledgerId, masterKey, entryId, toSend, cb, ctx, options);
                 }
-                client.addEntry(ledgerId, masterKey, entryId, toSend, cb, ctx, options);
-            }
-        });
+            });
+        } finally {
+            closeLock.readLock().unlock();
+        }
     }
 
     public void readEntryAndFenceLedger(final BookieSocketAddress addr,
@@ -164,54 +192,78 @@ public class BookieClient {
                                         final long entryId,
                                         final ReadEntryCallback cb,
                                         final Object ctx) {
-        final PerChannelBookieClient client = lookupClient(addr);
-        if (client == null) {
-            cb.readEntryComplete(BKException.Code.BookieHandleNotAvailableException,
-                                 ledgerId, entryId, null, ctx);
-            return;
-        }
+        closeLock.readLock().lock();
+        try {
+            final PerChannelBookieClient client = lookupClient(addr);
+            if (client == null) {
+                cb.readEntryComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
+                                     ledgerId, entryId, null, ctx);
+                return;
+            }
 
-        client.connectIfNeededAndDoOp(new GenericCallback<Void>() {
-            @Override
-            public void operationComplete(final int rc, Void result) {
-                if (rc != BKException.Code.OK) {
-                    executor.submitOrdered(ledgerId, new SafeRunnable() {
-                        @Override
-                        public void safeRun() {
-                            cb.readEntryComplete(rc, ledgerId, entryId, null, ctx);
+            client.connectIfNeededAndDoOp(new GenericCallback<Void>() {
+                @Override
+                public void operationComplete(final int rc, Void result) {
+                    if (rc != BKException.Code.OK) {
+                        try {
+                            executor.submitOrdered(ledgerId, new SafeRunnable() {
+                                @Override
+                                public void safeRun() {
+                                    cb.readEntryComplete(rc, ledgerId, entryId, null, ctx);
+                                }
+                            });
+                        } catch (RejectedExecutionException re) {
+                            cb.readEntryComplete(getRc(BKException.Code.InterruptedException),
+                                    ledgerId, entryId, null, ctx);
                         }
-                    });
-                    return;
+                        return;
+                    }
+                    client.readEntryAndFenceLedger(ledgerId, masterKey, entryId, cb, ctx);
                 }
-                client.readEntryAndFenceLedger(ledgerId, masterKey, entryId, cb, ctx);
-            }
-        });
+            });
+        } finally {
+            closeLock.readLock().unlock();
+        }
     }
 
     public void readEntry(final BookieSocketAddress addr, final long ledgerId, final long entryId,
                           final ReadEntryCallback cb, final Object ctx) {
-        final PerChannelBookieClient client = lookupClient(addr);
-        if (client == null) {
-            cb.readEntryComplete(BKException.Code.BookieHandleNotAvailableException,
-                                 ledgerId, entryId, null, ctx);
-            return;
-        }
+        closeLock.readLock().lock();
+        try {
+            final PerChannelBookieClient client = lookupClient(addr);
+            if (client == null) {
+                cb.readEntryComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
+                                     ledgerId, entryId, null, ctx);
+                return;
+            }
 
-        client.connectIfNeededAndDoOp(new GenericCallback<Void>() {
-            @Override
-            public void operationComplete(final int rc, Void result) {
-                if (rc != BKException.Code.OK) {
-                    executor.submitOrdered(ledgerId, new SafeRunnable() {
-                        @Override
-                        public void safeRun() {
-                            cb.readEntryComplete(rc, ledgerId, entryId, null, ctx);
+            client.connectIfNeededAndDoOp(new GenericCallback<Void>() {
+                @Override
+                public void operationComplete(final int rc, Void result) {
+                    if (rc != BKException.Code.OK) {
+                        try {
+                            executor.submitOrdered(ledgerId, new SafeRunnable() {
+                                @Override
+                                public void safeRun() {
+                                    cb.readEntryComplete(rc, ledgerId, entryId, null, ctx);
+                                }
+                            });
+                        } catch (RejectedExecutionException re) {
+                            cb.readEntryComplete(getRc(BKException.Code.InterruptedException),
+                                    ledgerId, entryId, null, ctx);
                         }
-                    });
-                    return;
+                        return;
+                    }
+                    client.readEntry(ledgerId, entryId, cb, ctx);
                 }
-                client.readEntry(ledgerId, entryId, cb, ctx);
-            }
-        });
+            });
+        } finally {
+            closeLock.readLock().unlock();
+        }
+    }
+
+    public boolean isClosed() {
+        return closed;
     }
 
     public void close() {

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java?rev=1570647&r1=1570646&r2=1570647&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java Fri Feb 21 18:01:17 2014
@@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHa
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeperClientStats;
@@ -70,7 +71,6 @@ public class PerChannelBookieClient exte
 
     static final Logger LOG = LoggerFactory.getLogger(PerChannelBookieClient.class);
 
-    static final long maxMemory = Runtime.getRuntime().maxMemory() / 5;
     public static final int MAX_FRAME_LENGTH = 2 * 1024 * 1024; // 2M
 
     BookieSocketAddress addr;
@@ -104,9 +104,10 @@ public class PerChannelBookieClient exte
 
     enum ConnectionState {
         DISCONNECTED, CONNECTING, CONNECTED, CLOSED
-    };
+    }
 
     volatile ConnectionState state;
+    final ReentrantReadWriteLock closeLock = new ReentrantReadWriteLock();
     private final ClientConfiguration conf;
 
     /**
@@ -190,6 +191,19 @@ public class PerChannelBookieClient exte
         }
     }
 
+    private void completeOperation(GenericCallback<Void> op, int rc) {
+        closeLock.readLock().lock();
+        try {
+            if (ConnectionState.CLOSED == state) {
+                op.operationComplete(BKException.Code.ClientClosedException, null);
+            } else {
+                op.operationComplete(rc, null);
+            }
+        } finally {
+            closeLock.readLock().unlock();
+        }
+    }
+
     private void connect() {
         LOG.info("Connecting to bookie: {}", addr);
 
@@ -247,7 +261,7 @@ public class PerChannelBookieClient exte
                 }
 
                 for (GenericCallback<Void> pendingOp : oldPendingOps) {
-                    pendingOp.operationComplete(rc, null);
+                    completeOperation(pendingOp, rc);
                 }
             }
         });
@@ -292,7 +306,7 @@ public class PerChannelBookieClient exte
         }
 
         if (completeOpNow) {
-            op.operationComplete(opRc, null);
+            completeOperation(op, opRc);
         }
 
     }
@@ -445,6 +459,16 @@ public class PerChannelBookieClient exte
      * Closes the bookie client permanently. It cannot be reused.
      */
     public void close() {
+        closeLock.writeLock().lock();
+        try {
+            if (ConnectionState.CLOSED == state) {
+                return;
+            }
+            state = ConnectionState.CLOSED;
+            errorOutOutstandingEntries(BKException.Code.ClientClosedException);
+        } finally {
+            closeLock.writeLock().unlock();
+        }
         closeInternal(true);
     }
 
@@ -474,6 +498,10 @@ public class PerChannelBookieClient exte
     }
 
     void errorOutReadKey(final CompletionKey key) {
+        errorOutReadKey(key, BKException.Code.BookieHandleNotAvailableException);
+    }
+
+    void errorOutReadKey(final CompletionKey key, final int rc) {
         executor.submitOrdered(key.ledgerId, new SafeRunnable() {
             @Override
             public void safeRun() {
@@ -490,7 +518,7 @@ public class PerChannelBookieClient exte
                               + " ledger-id: {} bookie: {}",
                               new Object[] { key.entryId, key.ledgerId, bAddress });
 
-                    readCompletion.cb.readEntryComplete(BKException.Code.BookieHandleNotAvailableException,
+                    readCompletion.cb.readEntryComplete(rc,
                                                         key.ledgerId, key.entryId, null, readCompletion.ctx);
                 }
             }
@@ -499,6 +527,10 @@ public class PerChannelBookieClient exte
     }
 
     void errorOutAddKey(final CompletionKey key) {
+        errorOutAddKey(key, BKException.Code.BookieHandleNotAvailableException);
+    }
+
+    void errorOutAddKey(final CompletionKey key, final int rc) {
         executor.submitOrdered(key.ledgerId, new SafeRunnable() {
             @Override
             public void safeRun() {
@@ -514,7 +546,7 @@ public class PerChannelBookieClient exte
                     LOG.debug("Could not write request for adding entry: {} ledger-id: {} bookie: {}",
                               new Object[] { key.entryId, key.ledgerId, bAddress });
 
-                    addCompletion.cb.writeComplete(BKException.Code.BookieHandleNotAvailableException, key.ledgerId,
+                    addCompletion.cb.writeComplete(rc, key.ledgerId,
                                                    key.entryId, addr, addCompletion.ctx);
                     LOG.debug("Invoked callback method: {}", key.entryId);
                 }
@@ -531,7 +563,7 @@ public class PerChannelBookieClient exte
      * here.
      */
 
-    void errorOutOutstandingEntries() {
+    void errorOutOutstandingEntries(int rc) {
 
         // DO NOT rewrite these using Map.Entry iterations. We want to iterate
         // on keys and see if we are successfully able to remove the key from
@@ -541,11 +573,11 @@ public class PerChannelBookieClient exte
         // calling the application callback.
 
         for (CompletionKey key : addCompletions.keySet()) {
-            errorOutAddKey(key);
+            errorOutAddKey(key, rc);
         }
 
         for (CompletionKey key : readCompletions.keySet()) {
-            errorOutReadKey(key);
+            errorOutReadKey(key, rc);
         }
     }
 
@@ -580,7 +612,7 @@ public class PerChannelBookieClient exte
             closeChannel(c);
         }
 
-        errorOutOutstandingEntries();
+        errorOutOutstandingEntries(BKException.Code.BookieHandleNotAvailableException);
 
         synchronized (this) {
             if (this.channel == c

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java?rev=1570647&r1=1570646&r2=1570647&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java Fri Feb 21 18:01:17 2014
@@ -1,5 +1,3 @@
-package org.apache.bookkeeper.util;
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -17,17 +15,21 @@ package org.apache.bookkeeper.util;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.bookkeeper.util;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.Random;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.commons.lang.StringUtils;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This class provides 2 things over the java {@link ScheduledExecutorService}.
@@ -45,12 +47,13 @@ import com.google.common.util.concurrent
  *
  */
 public class OrderedSafeExecutor {
-    ExecutorService threads[];
-    Random rand = new Random();
+    final ExecutorService threads[];
+    final long threadIds[];
+    final Random rand = new Random();
 
     /**
      * Constructs Safe executor
-     * 
+     *
      * @param numThreads
      *            - number of threads
      * @param threadName
@@ -65,6 +68,7 @@ public class OrderedSafeExecutor {
             threadName = "OrderedSafeExecutor";
         }
         threads = new ExecutorService[numThreads];
+        threadIds = new long[numThreads];
         for (int i = 0; i < numThreads; i++) {
             StringBuilder thName = new StringBuilder(threadName);
             thName.append("-");
@@ -73,6 +77,19 @@ public class OrderedSafeExecutor {
             ThreadFactoryBuilder tfb = new ThreadFactoryBuilder()
                     .setNameFormat(thName.toString());
             threads[i] = Executors.newSingleThreadExecutor(tfb.build());
+            final int tid = i;
+            try {
+                threads[i].submit(new SafeRunnable() {
+                    @Override
+                    public void safeRun() {
+                        threadIds[tid] = Thread.currentThread().getId();
+                    }
+                }).get();
+            } catch (InterruptedException e) {
+                throw new RuntimeException("Couldn't start thread " + i, e);
+            } catch (ExecutionException e) {
+                throw new RuntimeException("Couldn't start thread " + i, e);
+            }
         }
     }
 
@@ -112,6 +129,15 @@ public class OrderedSafeExecutor {
         chooseThread(orderingKey).submit(r);
     }
 
+    private long getThreadID(Object orderingKey) {
+        // skip hashcode generation in this special case
+        if (threadIds.length == 1) {
+            return threadIds[0];
+        }
+
+        return threadIds[MathUtils.signSafeMod(orderingKey.hashCode(), threadIds.length)];
+    }
+
     public void shutdown() {
         for (int i = 0; i < threads.length; i++) {
             threads[i].shutdown();
@@ -132,6 +158,8 @@ public class OrderedSafeExecutor {
      */
     public static abstract class OrderedSafeGenericCallback<T>
             implements GenericCallback<T> {
+        private final Logger LOG = LoggerFactory.getLogger(OrderedSafeGenericCallback.class);
+
         private final OrderedSafeExecutor executor;
         private final Object orderingKey;
 
@@ -147,12 +175,24 @@ public class OrderedSafeExecutor {
 
         @Override
         public final void operationComplete(final int rc, final T result) {
-            executor.submitOrdered(orderingKey, new SafeRunnable() {
-                    @Override
-                    public void safeRun() {
-                        safeOperationComplete(rc, result);
-                    }
-                });
+            // during closing, callbacks that are error out might try to submit to
+            // the scheduler again. if the submission will go to same thread, we
+            // don't need to submit to executor again. this is also an optimization for
+            // callback submission
+            if (Thread.currentThread().getId() == executor.getThreadID(orderingKey)) {
+                safeOperationComplete(rc, result);
+            } else {
+                try {
+                    executor.submitOrdered(orderingKey, new SafeRunnable() {
+                            @Override
+                            public void safeRun() {
+                                safeOperationComplete(rc, result);
+                            }
+                        });
+                } catch (RejectedExecutionException re) {
+                    LOG.warn("Failed to submit callback for {} : ", orderingKey, re);
+                }
+            }
         }
 
         public abstract void safeOperationComplete(int rc, T result);



Mime
View raw message