bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject bookkeeper git commit: BOOKKEEPER-965: Long Polling Part I: Changes in the write path
Date Thu, 17 Nov 2016 23:18:39 GMT
Repository: bookkeeper
Updated Branches:
  refs/heads/master 2ba9a2c4f -> 9359d682a


BOOKKEEPER-965: Long Polling Part I: Changes in the write path

This is the first in the series of changes for enabling long polling between bookkeeper client
and the bookkeeper server. The changes were originally implemented in the Twitter fork and
these pull request combine multiple commits from Twitter's bookkeeper fork as they include
not only the changes made initially but also bug fixes added since.

The first change captures the changes on the write path (AddEntry). We track the last add
confirmed in the FileInfo so that we can trigger actions when the value is updated

Author: Robin Dhamankar <robindh@Robins-MacBook-Air.local>

Reviewers: sijie@apache.org <sijie@apache.org>

Closes #73 from robindh/LongPollWritePath


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/9359d682
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/9359d682
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/9359d682

Branch: refs/heads/master
Commit: 9359d682a1598e30364eca6021d976f911e055b2
Parents: 2ba9a2c
Author: Robin Dhamankar <robindh@Robins-MacBook-Air.local>
Authored: Thu Nov 17 15:18:35 2016 -0800
Committer: Sijie Guo <sijie@apache.org>
Committed: Thu Nov 17 15:18:35 2016 -0800

----------------------------------------------------------------------
 .../org/apache/bookkeeper/bookie/Bookie.java    |  5 ++++
 .../org/apache/bookkeeper/bookie/FileInfo.java  | 14 ++++++++++++
 .../bookkeeper/bookie/IndexPersistenceMgr.java  | 24 ++++++++++++++++++++
 .../bookie/InterleavedLedgerStorage.java        | 20 ++++++++++++++++
 .../apache/bookkeeper/bookie/LedgerCache.java   |  3 +++
 .../bookkeeper/bookie/LedgerCacheImpl.java      | 10 ++++++++
 .../bookkeeper/bookie/LedgerDescriptor.java     |  1 +
 .../bookkeeper/bookie/LedgerDescriptorImpl.java |  5 ++++
 .../apache/bookkeeper/bookie/LedgerStorage.java | 10 ++++++++
 .../bookkeeper/bookie/SortedLedgerStorage.java  |  2 ++
 .../bookkeeper/bookie/TestSyncThread.java       |  5 ++++
 .../bookkeeper/meta/LedgerManagerTestCase.java  |  5 ++++
 12 files changed, 104 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9359d682/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 178f2bb..6200d21 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -1305,6 +1305,11 @@ public class Bookie extends BookieCriticalThread {
         }
     }
 
+    public long readLastAddConfirmed(long ledgerId) throws IOException {
+        LedgerDescriptor handle = handles.getReadOnlyHandle(ledgerId);
+        return handle.getLastAddConfirmed();
+    }
+
     // The rest of the code is test stuff
     static class CounterCallback implements WriteCallback {
         int count;

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9359d682/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
index 3b7c8dc..38ff0d9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
@@ -80,6 +80,9 @@ class FileInfo {
     private int stateBits;
     private boolean needFlushHeader = false;
 
+    // lac
+    private Long lac = null;
+
     // file access mode
     protected String mode;
 
@@ -90,6 +93,17 @@ class FileInfo {
         mode = "rw";
     }
 
+    synchronized Long getLastAddConfirmed() {
+        return lac;
+    }
+
+    synchronized long setLastAddConfirmed(long lac) {
+        if (null == this.lac || this.lac < lac) {
+            this.lac = lac;
+        }
+        return this.lac;
+    }
+
     public File getLf() {
         return lf;
     }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9359d682/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
index 8988985..f1ffcde 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
@@ -318,6 +318,30 @@ public class IndexPersistenceMgr {
         fileInfoCache.clear();
     }
 
+    Long getLastAddConfirmed(long ledgerId) throws IOException {
+        FileInfo fi = null;
+        try {
+            fi = getFileInfo(ledgerId, null);
+            return fi.getLastAddConfirmed();
+        } finally {
+            if (null != fi) {
+                fi.release();
+            }
+        }
+    }
+
+    long updateLastAddConfirmed(long ledgerId, long lac) throws IOException {
+        FileInfo fi = null;
+        try {
+            fi = getFileInfo(ledgerId, null);
+            return fi.setLastAddConfirmed(lac);
+        } finally {
+            if (null != fi) {
+                fi.release();
+            }
+        }
+    }
+
     byte[] readMasterKey(long ledgerId) throws IOException, BookieException {
         FileInfo fi = fileInfoCache.get(ledgerId);
         if (fi == null) {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9359d682/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index e85c8db..ee6edd5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -231,13 +231,33 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage,
Entry
     }
 
     @Override
+    public long getLastAddConfirmed(long ledgerId) throws IOException {
+        Long lac = ledgerCache.getLastAddConfirmed(ledgerId);
+        if (lac == null) {
+            ByteBuffer bb = getEntry(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED);
+            if (null == bb) {
+                return BookieProtocol.INVALID_ENTRY_ID;
+            } else {
+                bb.getLong(); // ledger id
+                bb.getLong(); // entry id
+                lac = bb.getLong();
+                lac = ledgerCache.updateLastAddConfirmed(ledgerId, lac);
+            }
+        }
+        return lac;
+    }
+
+    @Override
     synchronized public long addEntry(ByteBuffer entry) throws IOException {
         long ledgerId = entry.getLong();
         long entryId = entry.getLong();
+        long lac = entry.getLong();
         entry.rewind();
 
         processEntry(ledgerId, entryId, entry);
 
+        ledgerCache.updateLastAddConfirmed(ledgerId, lac);
+
         return entryId;
     }
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9359d682/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
index b2f8a91..4e0fdc1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
@@ -44,6 +44,9 @@ interface LedgerCache extends Closeable {
     void flushLedger(boolean doAll) throws IOException;
     long getLastEntry(long ledgerId) throws IOException;
 
+    Long getLastAddConfirmed(long ledgerId) throws IOException;
+    long updateLastAddConfirmed(long ledgerId, long lac) throws IOException;
+
     void deleteLedger(long ledgerId) throws IOException;
 
     LedgerCacheBean getJMXBean();

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9359d682/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
index e6d2edd..db84268 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
@@ -73,6 +73,16 @@ public class LedgerCacheImpl implements LedgerCache {
     }
 
     @Override
+    public Long getLastAddConfirmed(long ledgerId) throws IOException {
+        return indexPersistenceManager.getLastAddConfirmed(ledgerId);
+    }
+
+    @Override
+    public long updateLastAddConfirmed(long ledgerId, long lac) throws IOException {
+        return indexPersistenceManager.updateLastAddConfirmed(ledgerId, lac);
+    }
+
+    @Override
     public void putEntryOffset(long ledger, long entry, long offset) throws IOException {
         indexPageManager.putEntryOffset(ledger, entry, offset);
     }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9359d682/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
index 02850d1..c7a8c97 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
@@ -59,4 +59,5 @@ public abstract class LedgerDescriptor {
 
     abstract long addEntry(ByteBuffer entry) throws IOException;
     abstract ByteBuffer readEntry(long entryId) throws IOException;
+    abstract long getLastAddConfirmed() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9359d682/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
index d9814b6..266236d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
@@ -84,4 +84,9 @@ public class LedgerDescriptorImpl extends LedgerDescriptor {
     ByteBuffer readEntry(long entryId) throws IOException {
         return ledgerStorage.getEntry(ledgerId, entryId);
     }
+
+    @Override
+    long getLastAddConfirmed() throws IOException {
+        return ledgerStorage.getLastAddConfirmed(ledgerId);
+    }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9359d682/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
index f2f00c4..b0015bd 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
@@ -109,6 +109,16 @@ public interface LedgerStorage {
     ByteBuffer getEntry(long ledgerId, long entryId) throws IOException;
 
     /**
+     * Get last add confirmed.
+     *
+     * @param ledgerId
+     *          ledger id.
+     * @return last add confirmed.
+     * @throws IOException
+     */
+    long getLastAddConfirmed(long ledgerId) throws IOException;
+
+    /**
      * Flushes all data in the storage. Once this is called,
      * add data written to the LedgerStorage up until this point
      * has been persisted to perminant storage

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9359d682/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
index dcd1213..8e72852 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
@@ -96,8 +96,10 @@ public class SortedLedgerStorage extends InterleavedLedgerStorage
     public long addEntry(ByteBuffer entry) throws IOException {
         long ledgerId = entry.getLong();
         long entryId = entry.getLong();
+        long lac = entry.getLong();
         entry.rewind();
         memTable.addEntry(ledgerId, entryId, entry, this);
+        ledgerCache.updateLastAddConfirmed(ledgerId, lac);
         return entryId;
     }
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9359d682/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
index e56d566..988c2a2 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
@@ -325,6 +325,11 @@ public class TestSyncThread {
         }
 
         @Override
+        public long getLastAddConfirmed(long ledgerId) throws IOException {
+            return 0;
+        }
+
+        @Override
         public void flush() throws IOException {
         }
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9359d682/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
index 1e7c9a6..cf7fdcc 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
@@ -161,6 +161,11 @@ public abstract class LedgerManagerTestCase extends BookKeeperClusterTestCase
{
         }
 
         @Override
+        public long getLastAddConfirmed(long ledgerId) throws IOException {
+            return 0;
+        }
+
+        @Override
         public void flush() throws IOException {
         }
 


Mime
View raw message