zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From iv...@apache.org
Subject svn commit: r1327027 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/
Date Tue, 17 Apr 2012 10:18:56 GMT
Author: ivank
Date: Tue Apr 17 10:18:55 2012
New Revision: 1327027

URL: http://svn.apache.org/viewvc?rev=1327027&view=rev
Log:
BOOKKEEPER-196: Define interface between bookie and ledger storage (ivank)

Added:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorReadOnlyImpl.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1327027&r1=1327026&r2=1327027&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Tue Apr 17 10:18:55 2012
@@ -86,6 +86,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-216: Bookie doesn't exit with right exit code (sijie via ivank)
 
+        BOOKKEEPER-196: Define interface between bookie and ledger storage (ivank)
+
       hedwig-server/
       
         BOOKKEEPER-140: Hub server doesn't subscribe remote region correctly when a region
is down. (Sijie Gou via ivank)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java?rev=1327027&r1=1327026&r2=1327027&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
Tue Apr 17 10:18:55 2012
@@ -84,6 +84,7 @@ public class Bookie extends Thread {
 
     final SyncThread syncThread;
     final LedgerManager ledgerManager;
+    final LedgerStorage ledgerStorage;
     final HandleFactory handles;
 
     static final long METAENTRY_ID_LEDGER_KEY = -0x1000;
@@ -105,7 +106,7 @@ public class Bookie extends Thread {
 
     // jmx related beans
     BookieBean jmxBookieBean;
-    LedgerCacheBean jmxLedgerCacheBean;
+    BKMBeanInfo jmxLedgerStorageBean;
 
     Map<Long, byte[]> masterKeyCache = Collections.synchronizedMap(new HashMap<Long,
byte[]>());
 
@@ -137,12 +138,6 @@ public class Bookie extends Thread {
         }
     }
 
-    EntryLogger entryLogger;
-    LedgerCache ledgerCache;
-    // This is the thread that garbage collects the entry logs that do not
-    // contain any active ledgers in them; and compacts the entry logs that
-    // has lower remaining percentage to reclaim disk space.
-    final GarbageCollectorThread gcThread;
 
     /**
      * SyncThread is a background thread which flushes ledger index pages periodically.
@@ -185,7 +180,7 @@ public class Bookie extends Thread {
                 synchronized(this) {
                     try {
                         wait(flushInterval);
-                        if (!entryLogger.testAndClearSomethingWritten()) {
+                        if (!ledgerStorage.isFlushRequired()) {
                             continue;
                         }
                     } catch (InterruptedException e) {
@@ -208,17 +203,11 @@ public class Bookie extends Thread {
 
                 boolean flushFailed = false;
                 try {
-                    ledgerCache.flushLedger(true);
+                    ledgerStorage.flush();
                 } catch (IOException e) {
                     LOG.error("Exception flushing Ledger", e);
                     flushFailed = true;
                 }
-                try {
-                    entryLogger.flush();
-                } catch (IOException e) {
-                    LOG.error("Exception flushing entry logger", e);
-                    flushFailed = true;
-                }
 
                 // if flush failed, we should not roll last mark, otherwise we would
                 // have some ledgers are not flushed and their journal entries were lost
@@ -378,27 +367,6 @@ public class Bookie extends Thread {
         return currentDirs;
     }
 
-    /**
-     * Scanner used to do entry log compaction
-     */
-    class EntryLogCompactionScanner implements EntryLogger.EntryLogScanner {
-        @Override
-        public boolean accept(long ledgerId) {
-            // bookie has no knowledge about which ledger is deleted
-            // so just accept all ledgers.
-            return true;
-        }
-
-        @Override
-        public void process(long ledgerId, ByteBuffer buffer)
-            throws IOException {
-            try {
-                Bookie.this.addEntryByLedgerId(ledgerId, buffer);
-            } catch (BookieException be) {
-                throw new IOException(be);
-            }
-        }
-    }
 
     public Bookie(ServerConfiguration conf)
             throws IOException, KeeperException, InterruptedException, BookieException {
@@ -416,11 +384,9 @@ public class Bookie extends Thread {
         ledgerManager = LedgerManagerFactory.newLedgerManager(conf, this.zk);
 
         syncThread = new SyncThread(conf);
-        entryLogger = new EntryLogger(conf);
-        ledgerCache = new LedgerCacheImpl(conf, ledgerManager);
-        gcThread = new GarbageCollectorThread(conf, this.zk, ledgerCache, entryLogger,
-                ledgerManager, new EntryLogCompactionScanner());
-        handles = new HandleFactoryImpl(entryLogger, ledgerCache);
+        ledgerStorage = new InterleavedLedgerStorage(conf, ledgerManager);
+
+        handles = new HandleFactoryImpl(ledgerStorage);
 
         // replay journals
         readJournal();
@@ -506,7 +472,7 @@ public class Bookie extends Thread {
                     } else {
                         byte[] key = masterKeyCache.get(ledgerId);
                         if (key == null) {
-                            key = ledgerCache.readMasterKey(ledgerId);
+                            key = ledgerStorage.readMasterKey(ledgerId);
                         }
                         LedgerDescriptor handle = handles.getHandle(ledgerId, key);
 
@@ -527,7 +493,8 @@ public class Bookie extends Thread {
         LOG.debug("I'm starting a bookie with journal directory " + journalDirectory.getName());
         super.start();
         syncThread.start();
-        gcThread.start();
+
+        ledgerStorage.start();
         // set running here.
         // since bookie server use running as a flag to tell bookie server whether it is
alive
         // if setting it in bookie thread, the watcher might run before bookie thread.
@@ -584,13 +551,12 @@ public class Bookie extends Thread {
             BKMBeanRegistry.getInstance().register(jmxBookieBean, parent);
 
             try {
-                jmxLedgerCacheBean = this.ledgerCache.getJMXBean();
-                BKMBeanRegistry.getInstance().register(jmxLedgerCacheBean, jmxBookieBean);
+                jmxLedgerStorageBean = this.ledgerStorage.getJMXBean();
+                BKMBeanRegistry.getInstance().register(jmxLedgerStorageBean, jmxBookieBean);
             } catch (Exception e) {
                 LOG.warn("Failed to register with JMX for ledger cache", e);
-                jmxLedgerCacheBean = null;
+                jmxLedgerStorageBean = null;
             }
-
         } catch (Exception e) {
             LOG.warn("Failed to register with JMX", e);
             jmxBookieBean = null;
@@ -602,8 +568,8 @@ public class Bookie extends Thread {
      */
     public void unregisterJMX() {
         try {
-            if (jmxLedgerCacheBean != null) {
-                BKMBeanRegistry.getInstance().unregister(jmxLedgerCacheBean);
+            if (jmxLedgerStorageBean != null) {
+                BKMBeanRegistry.getInstance().unregister(jmxLedgerStorageBean);
             }
         } catch (Exception e) {
             LOG.warn("Failed to unregister with JMX", e);
@@ -616,7 +582,7 @@ public class Bookie extends Thread {
             LOG.warn("Failed to unregister with JMX", e);
         }
         jmxBookieBean = null;
-        jmxLedgerCacheBean = null;
+        jmxLedgerStorageBean = null;
     }
 
 
@@ -940,17 +906,16 @@ public class Bookie extends Thread {
                 this.exitCode = exitCode;
                 // mark bookie as in shutting down progress
                 shuttingdown = true;
-                // shut down gc thread, which depends on zookeeper client
-                // also compaction will write entries again to entry log file
-                gcThread.shutdown();
+
+                // Shutdown the EntryLogger which has the GarbageCollector Thread running
+                ledgerStorage.shutdown();
+
                 // Shutdown the ZK client
                 if(zk != null) zk.close();
                 this.interrupt();
                 this.join();
                 syncThread.shutdown();
 
-                // Shutdown the EntryLogger which has the GarbageCollector Thread running
-                entryLogger.shutdown();
                 // close Ledger Manager
                 ledgerManager.close();
                 // setting running to false here, so watch thread in bookie server know it
only after bookie shut down
@@ -998,7 +963,7 @@ public class Bookie extends Thread {
 
     protected void addEntryByLedgerId(long ledgerId, ByteBuffer entry)
         throws IOException, BookieException {
-        byte[] key = ledgerCache.readMasterKey(ledgerId);
+        byte[] key = ledgerStorage.readMasterKey(ledgerId);
         LedgerDescriptor handle = handles.getHandle(ledgerId, key);
         handle.addEntry(entry);
     }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java?rev=1327027&r1=1327026&r2=1327027&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
Tue Apr 17 10:18:55 2012
@@ -70,9 +70,6 @@ public class EntryLogger {
     final static int LOGFILE_HEADER_SIZE = 1024;
     final ByteBuffer LOGFILE_HEADER = ByteBuffer.allocate(LOGFILE_HEADER_SIZE);
 
-    // this indicates that a write has happened since the last flush
-    private volatile boolean somethingWritten = false;
-
     final static long MB = 1024 * 1024;
 
     /**
@@ -284,7 +281,7 @@ public class EntryLogger {
         long pos = logChannel.position();
         logChannel.write(entry);
         //logChannel.flush(false);
-        somethingWritten = true;
+
         return (logId << 32L) | pos;
     }
 
@@ -365,14 +362,6 @@ public class EntryLogger {
         throw new FileNotFoundException("No file for log " + Long.toHexString(logId));
     }
 
-    synchronized public boolean testAndClearSomethingWritten() {
-        try {
-            return somethingWritten;
-        } finally {
-            somethingWritten = false;
-        }
-    }
-
     /**
      * Scan entry log
      *

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java?rev=1327027&r1=1327026&r2=1327027&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
Tue Apr 17 10:18:55 2012
@@ -75,9 +75,6 @@ public class GarbageCollectorThread exte
 
     final LedgerManager ledgerManager;
 
-    // ZooKeeper Client
-    final ZooKeeper zk;
-
     // flag to ensure gc thread will not be interrupted during compaction
     // to reduce the risk getting entry log corrupted
     final AtomicBoolean compacting = new AtomicBoolean(false);
@@ -114,7 +111,6 @@ public class GarbageCollectorThread exte
      * @throws IOException
      */
     public GarbageCollectorThread(ServerConfiguration conf,
-                                  ZooKeeper zookeeper,
                                   LedgerCache ledgerCache,
                                   EntryLogger entryLogger,
                                   LedgerManager ledgerManager,
@@ -122,7 +118,6 @@ public class GarbageCollectorThread exte
         throws IOException {
         super("GarbageCollectorThread");
 
-        this.zk = zookeeper;
         this.ledgerCache = ledgerCache;
         this.entryLogger = entryLogger;
         this.ledgerManager = ledgerManager;
@@ -189,11 +184,6 @@ public class GarbageCollectorThread exte
                 }
             }
 
-            // Dependency check.
-            if (null == zk) {
-                continue;
-            }
-
             // Extract all of the ledger ID's that comprise all of the entry logs
             // (except for the current new one which is still being written to).
             try {

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java?rev=1327027&r1=1327026&r2=1327027&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java
Tue Apr 17 10:18:55 2012
@@ -29,12 +29,10 @@ class HandleFactoryImpl implements Handl
     HashMap<Long, LedgerDescriptor> readOnlyLedgers
         = new HashMap<Long, LedgerDescriptor>();
 
-    final EntryLogger entryLogger;
-    final LedgerCache ledgerCache;
+    final LedgerStorage ledgerStorage;
 
-    HandleFactoryImpl(EntryLogger entryLogger, LedgerCache ledgerCache) {
-        this.entryLogger = entryLogger;
-        this.ledgerCache = ledgerCache;
+    HandleFactoryImpl(LedgerStorage ledgerStorage) {
+        this.ledgerStorage = ledgerStorage;
     }
 
     @Override
@@ -44,8 +42,7 @@ class HandleFactoryImpl implements Handl
         synchronized (ledgers) {
             handle = ledgers.get(ledgerId);
             if (handle == null) {
-                handle = LedgerDescriptor.create(masterKey, ledgerId,
-                                                 entryLogger, ledgerCache);
+                handle = LedgerDescriptor.create(masterKey, ledgerId, ledgerStorage);
                 ledgers.put(ledgerId, handle);
             }
             handle.checkAccess(masterKey);
@@ -60,7 +57,7 @@ class HandleFactoryImpl implements Handl
         synchronized (ledgers) {
             handle = readOnlyLedgers.get(ledgerId);
             if (handle == null) {
-                handle = LedgerDescriptor.createReadOnly(ledgerId, entryLogger, ledgerCache);
+                handle = LedgerDescriptor.createReadOnly(ledgerId, ledgerStorage);
                 readOnlyLedgers.put(ledgerId, handle);
             }
         }

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java?rev=1327027&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
(added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
Tue Apr 17 10:18:55 2012
@@ -0,0 +1,183 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.nio.ByteBuffer;
+import java.io.IOException;
+
+import org.apache.bookkeeper.jmx.BKMBeanInfo;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.LedgerManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Interleave ledger storage
+ * This ledger storage implementation stores all entries in a single
+ * file and maintains an index file for each ledger.
+ */
+class InterleavedLedgerStorage implements LedgerStorage {
+    final static Logger LOG = LoggerFactory.getLogger(InterleavedLedgerStorage.class);
+
+    private EntryLogger entryLogger;
+    private LedgerCache ledgerCache;
+    // This is the thread that garbage collects the entry logs that do not
+    // contain any active ledgers in them; and compacts the entry logs that
+    // has lower remaining percentage to reclaim disk space.
+    final GarbageCollectorThread gcThread;
+
+    // this indicates that a write has happened since the last flush
+    private volatile boolean somethingWritten = false;
+
+    InterleavedLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager)
+            throws IOException {
+        entryLogger = new EntryLogger(conf);
+        ledgerCache = new LedgerCacheImpl(conf, ledgerManager);
+        gcThread = new GarbageCollectorThread(conf, ledgerCache, entryLogger,
+                ledgerManager, new EntryLogCompactionScanner());
+    }
+
+    @Override    
+    public void start() {
+        gcThread.start();
+    }
+
+    @Override
+    public void shutdown() throws InterruptedException {
+        // shut down gc thread, which depends on zookeeper client
+        // also compaction will write entries again to entry log file
+        gcThread.shutdown();
+        entryLogger.shutdown();
+    }
+
+    @Override
+    public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException {
+        ledgerCache.setMasterKey(ledgerId, masterKey);
+    }
+
+    @Override
+    public byte[] readMasterKey(long ledgerId) throws IOException, BookieException {
+        return ledgerCache.readMasterKey(ledgerId);
+    }
+
+    @Override
+    public boolean ledgerExists(long ledgerId) throws IOException {
+        return ledgerCache.ledgerExists(ledgerId);
+    }
+
+    @Override
+    synchronized public long addEntry(ByteBuffer entry) throws IOException {
+        long ledgerId = entry.getLong();
+        long entryId = entry.getLong();
+        entry.rewind();
+        
+        /*
+         * Log the entry
+         */
+        long pos = entryLogger.addEntry(ledgerId, entry);
+        
+        
+        /*
+         * Set offset of entry id to be the current ledger position
+         */
+        ledgerCache.putEntryOffset(ledgerId, entryId, pos);
+
+        somethingWritten = true;
+
+        return entryId;
+    }
+
+    @Override
+    public ByteBuffer getEntry(long ledgerId, long entryId) throws IOException {
+        long offset;
+        /*
+         * If entryId is -1, then return the last written.
+         */
+        if (entryId == -1) {
+            entryId = ledgerCache.getLastEntry(ledgerId);
+        }
+
+        offset = ledgerCache.getEntryOffset(ledgerId, entryId);
+        if (offset == 0) {
+            throw new Bookie.NoEntryException(ledgerId, entryId);
+        }
+        return ByteBuffer.wrap(entryLogger.readEntry(ledgerId, entryId, offset));
+    }
+
+    @Override
+    public boolean isFlushRequired() {
+        return somethingWritten;
+    };
+
+    @Override
+    public void flush() throws IOException {
+        synchronized (entryLogger) {
+            if (!somethingWritten) {
+                return;
+            }
+            somethingWritten = false;
+            boolean flushFailed = false;
+
+            try {
+                ledgerCache.flushLedger(true);
+            } catch (IOException ioe) {
+                LOG.error("Exception flushing Ledger cache", ioe);
+                flushFailed = true;
+            }
+            
+            try {
+                entryLogger.flush();
+            } catch (IOException ioe) {
+                LOG.error("Exception flushing Ledger", ioe);
+                flushFailed = true;
+            }
+            if (flushFailed) {
+                throw new IOException("Flushing to storage failed, check logs");
+            }
+        }
+    }
+
+    @Override
+    public BKMBeanInfo getJMXBean() {
+        return ledgerCache.getJMXBean();
+    }
+
+    /**
+     * Scanner used to do entry log compaction
+     */
+    class EntryLogCompactionScanner implements EntryLogger.EntryLogScanner {
+        @Override
+        public boolean accept(long ledgerId) {
+            // bookie has no knowledge about which ledger is deleted
+            // so just accept all ledgers.
+            return true;
+        }
+
+        @Override
+        public void process(long ledgerId, ByteBuffer buffer)
+            throws IOException {
+            addEntry(buffer);
+        }
+    }
+
+}
\ No newline at end of file

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java?rev=1327027&r1=1327026&r2=1327027&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
Tue Apr 17 10:18:55 2012
@@ -35,21 +35,19 @@ import org.slf4j.LoggerFactory;
 public abstract class LedgerDescriptor {
     static LedgerDescriptor create(byte[] masterKey,
                                    long ledgerId,
-                                   EntryLogger entryLogger,
-                                   LedgerCache ledgerCache) throws IOException {
-        LedgerDescriptor ledger = new LedgerDescriptorImpl(masterKey, ledgerId, entryLogger,
ledgerCache);
-        ledgerCache.setMasterKey(ledgerId, masterKey);
+                                   LedgerStorage ledgerStorage) throws IOException {
+        LedgerDescriptor ledger = new LedgerDescriptorImpl(masterKey, ledgerId, ledgerStorage);
+        ledgerStorage.setMasterKey(ledgerId, masterKey);
         return ledger;
     }
 
     static LedgerDescriptor createReadOnly(long ledgerId,
-                                           EntryLogger entryLogger,
-                                           LedgerCache ledgerCache)
+                                           LedgerStorage ledgerStorage)
             throws IOException, Bookie.NoLedgerException {
-        if (!ledgerCache.ledgerExists(ledgerId)) {
+        if (!ledgerStorage.ledgerExists(ledgerId)) {
             throw new Bookie.NoLedgerException(ledgerId);
         }
-        return new LedgerDescriptorReadOnlyImpl(ledgerId, entryLogger, ledgerCache);
+        return new LedgerDescriptorReadOnlyImpl(ledgerId, ledgerStorage);
     }
 
     abstract void checkAccess(byte masterKey[]) throws BookieException, IOException;

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java?rev=1327027&r1=1327026&r2=1327027&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
Tue Apr 17 10:18:55 2012
@@ -35,18 +35,16 @@ import org.slf4j.LoggerFactory;
  */
 public class LedgerDescriptorImpl extends LedgerDescriptor {
     final static Logger LOG = LoggerFactory.getLogger(LedgerDescriptor.class);
-    LedgerCache ledgerCache;
+    final LedgerStorage ledgerStorage;
     private long ledgerId;
-    EntryLogger entryLogger;
 
     volatile private boolean fenced = false;
     final byte[] masterKey;
 
-    LedgerDescriptorImpl(byte[] masterKey, long ledgerId, EntryLogger entryLogger, LedgerCache
ledgerCache) {
+    LedgerDescriptorImpl(byte[] masterKey, long ledgerId, LedgerStorage ledgerStorage) {
         this.masterKey = masterKey;
         this.ledgerId = ledgerId;
-        this.entryLogger = entryLogger;
-        this.ledgerCache = ledgerCache;
+        this.ledgerStorage = ledgerStorage;
     }
 
     @Override
@@ -78,36 +76,13 @@ public class LedgerDescriptorImpl extend
         if (ledgerId != this.ledgerId) {
             throw new IOException("Entry for ledger " + ledgerId + " was sent to " + this.ledgerId);
         }
-        long entryId = entry.getLong();
         entry.rewind();
 
-        /*
-         * Log the entry
-         */
-        long pos = entryLogger.addEntry(ledgerId, entry);
-
-
-        /*
-         * Set offset of entry id to be the current ledger position
-         */
-        ledgerCache.putEntryOffset(ledgerId, entryId, pos);
-        return entryId;
+        return ledgerStorage.addEntry(entry);
     }
 
     @Override
     ByteBuffer readEntry(long entryId) throws IOException {
-        long offset;
-        /*
-         * If entryId is -1, then return the last written.
-         */
-        if (entryId == -1) {
-            entryId = ledgerCache.getLastEntry(ledgerId);
-        }
-
-        offset = ledgerCache.getEntryOffset(ledgerId, entryId);
-        if (offset == 0) {
-            throw new Bookie.NoEntryException(ledgerId, entryId);
-        }
-        return ByteBuffer.wrap(entryLogger.readEntry(ledgerId, entryId, offset));
+        return ledgerStorage.getEntry(ledgerId, entryId);
     }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorReadOnlyImpl.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorReadOnlyImpl.java?rev=1327027&r1=1327026&r2=1327027&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorReadOnlyImpl.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorReadOnlyImpl.java
Tue Apr 17 10:18:55 2012
@@ -28,9 +28,8 @@ import java.nio.ByteBuffer;
  * to write entries to a ledger and read entries from a ledger.
  */
 public class LedgerDescriptorReadOnlyImpl extends LedgerDescriptorImpl {
-    LedgerDescriptorReadOnlyImpl(long ledgerId, EntryLogger entryLogger,
-            LedgerCache ledgerCache) {
-        super(null, ledgerId, entryLogger, ledgerCache);
+    LedgerDescriptorReadOnlyImpl(long ledgerId, LedgerStorage storage) {
+        super(null, ledgerId, storage);
     }
 
     @Override

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java?rev=1327027&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
(added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
Tue Apr 17 10:18:55 2012
@@ -0,0 +1,91 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.bookkeeper.jmx.BKMBeanInfo;
+
+/**
+ * Interface for storing ledger data
+ * on persistant storage.
+ */
+interface LedgerStorage {
+    /**
+     * Start any background threads
+     * belonging to the storage system. For example,
+     * garbage collection.
+     */
+    void start();
+
+    /**
+     * Cleanup and free any resources
+     * being used by the storage system.
+     */
+    void shutdown() throws InterruptedException;
+
+    /**
+     * Whether a ledger exists
+     */
+    boolean ledgerExists(long ledgerId) throws IOException;
+
+    /**
+     * Set the master key for a ledger
+     */
+    void setMasterKey(long ledgerId, byte[] masterKey) throws IOException;
+
+    /**
+     * Get the master key for a ledger
+     * @throws IOException if there is an error reading the from the ledger
+     * @throws BookieException if no such ledger exists
+     */
+    byte[] readMasterKey(long ledgerId) throws IOException, BookieException;
+
+    /**
+     * Add an entry to the storage.
+     * @return the entry id of the entry added
+     */
+    long addEntry(ByteBuffer entry) throws IOException;
+
+    /**
+     * Read an entry from storage
+     */
+    ByteBuffer getEntry(long ledgerId, long entryId) throws IOException;
+
+    /**
+     * Whether there is data in the storage which needs to be flushed
+     */
+    boolean isFlushRequired();
+
+    /**
+     * Flushes all data in the storage. Once this is called,
+     * add data written to the LedgerStorage up until this point
+     * has been persisted to perminant storage
+     */
+    void flush() throws IOException;
+
+    /**
+     * Get the JMX management bean for this LedgerStorage
+     */
+    BKMBeanInfo getJMXBean();
+}
\ No newline at end of file



Mime
View raw message