zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From iv...@apache.org
Subject svn commit: r1576883 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ bookkeeper-...
Date Wed, 12 Mar 2014 20:26:59 GMT
Author: ivank
Date: Wed Mar 12 20:26:58 2014
New Revision: 1576883

URL: http://svn.apache.org/r1576883
Log:
BOOKKEEPER-432: Improve performance of entry log range read per ledger entries (yixue, sijie via ivank)

Added:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CacheCallback.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryKeyValue.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SkipListArena.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SkipListFlusher.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/resources/findbugsExclude.xml
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1576883&r1=1576882&r2=1576883&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Wed Mar 12 20:26:58 2014
@@ -176,6 +176,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-740: AutoRecoveryMainTest#testAutoRecoverySessionLoss is failing (Rakesh via sijie)
 
+        BOOKKEEPER-432: Improve performance of entry log range read per ledger entries (yixue, sijie via ivank)
+
       hedwig-server:
 
         BOOKKEEPER-601: readahead cache size isn't updated correctly (sijie via fpj)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java?rev=1576883&r1=1576882&r2=1576883&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java Wed Mar 12 20:26:58 2014
@@ -437,9 +437,16 @@ public class Bookie extends BookieCritic
         this.ledgerDirsManager.init();
         // instantiate the journal
         journal = new Journal(conf, ledgerDirsManager);
-        ledgerStorage = new InterleavedLedgerStorage(conf, ledgerManager,
-                                                     ledgerDirsManager, indexDirsManager,
-                                                     journal);
+        // Check the type of storage.
+        if (conf.getSortedLedgerStorageEnabled()) {
+            ledgerStorage = new SortedLedgerStorage(conf, ledgerManager,
+                                                    ledgerDirsManager, indexDirsManager,
+                                                    journal);
+        } else {
+            ledgerStorage = new InterleavedLedgerStorage(conf, ledgerManager,
+                                                         ledgerDirsManager, indexDirsManager,
+                                                         journal);
+        }
         syncThread = new SyncThread(conf, getLedgerDirsListener(),
                                     ledgerStorage, journal);
 

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CacheCallback.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CacheCallback.java?rev=1576883&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CacheCallback.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CacheCallback.java Wed Mar 12 20:26:58 2014
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.IOException;
+
+/**
+ * Interface plugged into caching to receive callback notifications
+ */
+public interface CacheCallback {
+    /**
+     * Process notification that cache size limit reached.
+     */
+    public void onSizeLimitReached() throws IOException;
+}

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryKeyValue.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryKeyValue.java?rev=1576883&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryKeyValue.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryKeyValue.java Wed Mar 12 20:26:58 2014
@@ -0,0 +1,129 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.bookie;
+
+import java.nio.ByteBuffer;
+
+/**
+ * An entry Key/Value.
+ * EntryKeyValue wraps a byte array and takes offsets and lengths into the array to
+ * interpret the content as entry blob.
+ */
+public class EntryKeyValue extends EntryKey {
+    final private byte [] bytes;
+    private int offset = 0; // start offset of entry blob
+    private int length = 0; // length of entry blob
+
+    /**
+    * @return The byte array backing this EntryKeyValue.
+    */
+    public byte [] getBuffer() {
+        return this.bytes;
+    }
+
+    /**
+    * @return Offset into {@link #getBuffer()} at which the EntryKeyValue starts.
+    */
+    public int getOffset() {
+        return this.offset;
+    }
+
+    /**
+    * @return Length of bytes this EntryKeyValue occupies in {@link #getBuffer()}.
+    */
+    public int getLength() {
+        return this.length;
+    }
+
+    /**
+     *
+     * Creates a EntryKeyValue from the start of the specified byte array.
+     * Presumes <code>bytes</code> content contains the value portion of a EntryKeyValue.
+     * @param bytes byte array
+     */
+    public EntryKeyValue(long ledgerId, long entryId, final byte [] bytes) {
+        this(ledgerId, entryId, bytes, 0, bytes.length);
+    }
+
+    /**
+     *
+     * Creates a EntryKeyValue from the start of the specified byte array.
+     * Presumes <code>bytes</code> content contains the value portion of a EntryKeyValue.
+     * @param bytes byte array
+     * @param offset offset in bytes as start of blob
+     * @param length of blob
+     */
+    public EntryKeyValue(long ledgerId, long entryId, final byte [] bytes, int offset, int length) {
+        super(ledgerId, entryId);
+        this.bytes = bytes;
+        this.offset = offset;
+        this.length = length;
+    }
+
+    /**
+    * Returns the blob wrapped in a new <code>ByteBuffer</code>.
+    *
+    * @return the value
+    */
+    public ByteBuffer getValueAsByteBuffer() {
+        return ByteBuffer.wrap(getBuffer(), getOffset(), getLength());
+    }
+
+    /**
+    * Write EntryKeyValue blob into the provided byte buffer.
+    *
+    * @param dst the bytes buffer to use
+    *
+    * @return The number of useful bytes in the buffer.
+    *
+    * @throws IllegalArgumentException an illegal value was passed or there is insufficient space
+    * remaining in the buffer
+    */
+    int writeToByteBuffer(ByteBuffer dst) {
+        if (dst.remaining() < getLength()) {
+            throw new IllegalArgumentException("Buffer size " + dst.remaining() + " < " + getLength());
+        }
+
+        dst.put(getBuffer(), getOffset(), getLength());
+        return getLength();
+    }
+
+    /**
+    * String representation
+    */
+    public String toString() {
+        return ledgerId + ":" + entryId;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        // since this entry is identified by (lid, eid)
+        // so just use {@link org.apache.bookkeeper.bookie.EntryKey#equals}.
+        return super.equals(other);
+    }
+
+    @Override
+    public int hashCode() {
+        // since this entry is identified by (lid, eid)
+        // so just use {@link org.apache.bookkeeper.bookie.EntryKey#hashCode} as the hash code.
+        return super.hashCode();
+    }
+
+}

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java?rev=1576883&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java Wed Mar 12 20:26:58 2014
@@ -0,0 +1,380 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+
+/**
+ * The EntryMemTable holds in-memory representation to the entries not-yet flushed.
+ * When asked to flush, current EntrySkipList is moved to snapshot and is cleared.
+ * We continue to serve edits out of new EntrySkipList and backing snapshot until
+ * flusher reports in that the flush succeeded. At that point we let the snapshot go.
+ */
+public class EntryMemTable {
+    private static Logger Logger = LoggerFactory.getLogger(Journal.class);
+
+    /**
+     * Entry skip list
+     */
+    static class EntrySkipList extends ConcurrentSkipListMap<EntryKey, EntryKeyValue> {
+        final Checkpoint cp;
+        static final EntrySkipList EMPTY_VALUE = new EntrySkipList(Checkpoint.MAX) {
+            @Override
+            public boolean isEmpty() {
+                return true;
+            }
+        };
+
+        EntrySkipList(final Checkpoint cp) {
+            super(EntryKey.COMPARATOR);
+            this.cp = cp;
+        }
+
+        int compareTo(final Checkpoint cp) {
+            return this.cp.compareTo(cp);
+        }
+
+        @Override
+        public EntryKeyValue put(EntryKey k, EntryKeyValue v) {
+            return putIfAbsent(k, v);
+        }
+
+        @Override
+        public EntryKeyValue putIfAbsent(EntryKey k, EntryKeyValue v) {
+            assert k.equals(v);
+            return super.putIfAbsent(v, v);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            return this == o;
+        }
+    }
+
+    volatile EntrySkipList kvmap;
+
+    // Snapshot of EntryMemTable.  Made for flusher.
+    volatile EntrySkipList snapshot;
+
+    final ServerConfiguration conf;
+    final CheckpointSource checkpointSource;
+
+    final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+    // Used to track own data size
+    final AtomicLong size;
+
+    final long skipListSizeLimit;
+
+    SkipListArena allocator;
+
+    private EntrySkipList newSkipList() {
+        return new EntrySkipList(checkpointSource.newCheckpoint());
+    }
+
+    /**
+    * Constructor.
+    * @param conf Server configuration
+    */
+    public EntryMemTable(final ServerConfiguration conf, final CheckpointSource source) {
+        this.checkpointSource = source;
+        this.kvmap = newSkipList();
+        this.snapshot = EntrySkipList.EMPTY_VALUE;
+        this.conf = conf;
+        this.size = new AtomicLong(0);
+        this.allocator = new SkipListArena(conf);
+        // skip list size limit
+        this.skipListSizeLimit = conf.getSkipListSizeLimit();
+    }
+
+    void dump() {
+        for (EntryKey key: this.kvmap.keySet()) {
+            Logger.info(key.toString());
+        }
+        for (EntryKey key: this.snapshot.keySet()) {
+            Logger.info(key.toString());
+        }
+    }
+
+    Checkpoint snapshot() throws IOException {
+        return snapshot(Checkpoint.MAX);
+    }
+
+    /**
+     * Snapshot current EntryMemTable. if given <i>oldCp</i> is older than current checkpoint,
+     * we don't do any snapshot. If snapshot happened, we return the checkpoint of the snapshot.
+     *
+     * @param oldCp
+     *          checkpoint
+     * @return checkpoint of the snapshot, null means no snapshot
+     * @throws IOException
+     */
+    Checkpoint snapshot(Checkpoint oldCp) throws IOException {
+        Checkpoint cp = null;
+        // No-op if snapshot currently has entries
+        if (this.snapshot.isEmpty() &&
+                this.kvmap.compareTo(oldCp) < 0) {
+            this.lock.writeLock().lock();
+            try {
+                if (this.snapshot.isEmpty() && !this.kvmap.isEmpty()
+                        && this.kvmap.compareTo(oldCp) < 0) {
+                    this.snapshot = this.kvmap;
+                    this.kvmap = newSkipList();
+                    // get the checkpoint of the memtable.
+                    cp = this.kvmap.cp;
+                    // Reset heap to not include any keys
+                    this.size.set(0);
+                    // Reset allocator so we get a fresh buffer for the new EntryMemTable
+                    this.allocator = new SkipListArena(conf);
+                }
+            } finally {
+                this.lock.writeLock().unlock();
+            }
+        }
+        return cp;
+    }
+
+    /**
+     * Flush snapshot and clear it.
+     */
+    long flush(final SkipListFlusher flusher) throws IOException {
+        return flushSnapshot(flusher, Checkpoint.MAX);
+    }
+
+    /**
+     * Flush memtable until checkpoint.
+     *
+     * @param checkpoint
+     *          all data before this checkpoint need to be flushed.
+     */
+    public long flush(SkipListFlusher flusher, Checkpoint checkpoint) throws IOException {
+        long size = flushSnapshot(flusher, checkpoint);
+        if (null != snapshot(checkpoint)) {
+            size += flushSnapshot(flusher, checkpoint);
+        }
+        return size;
+    }
+
+    /**
+     * Flush snapshot and clear it iff its data is before checkpoint.
+     * Only this function change non-empty this.snapshot.
+     */
+    private long flushSnapshot(final SkipListFlusher flusher, Checkpoint checkpoint) throws IOException {
+        long size = 0;
+        if (this.snapshot.compareTo(checkpoint) < 0) {
+            long ledger, ledgerGC = -1;
+            synchronized (this) {
+                EntrySkipList keyValues = this.snapshot;
+                if (keyValues.compareTo(checkpoint) < 0) {
+                    for (EntryKey key : keyValues.keySet()) {
+                        EntryKeyValue kv = (EntryKeyValue)key;
+                        size += kv.getLength();
+                        ledger = kv.getLedgerId();
+                        if (ledgerGC != ledger) {
+                            try {
+                                flusher.process(ledger, kv.getEntryId(), kv.getValueAsByteBuffer());
+                            } catch (NoLedgerException exception) {
+                                ledgerGC = ledger;
+                            }
+                        }
+                    }
+                    clearSnapshot(keyValues);
+                }
+            }
+        }
+
+        return size;
+    }
+
+    /**
+     * The passed snapshot was successfully persisted; it can be let go.
+     * @param keyValues The snapshot to clean out.
+     * @see {@link #snapshot()}
+     */
+    private void clearSnapshot(final EntrySkipList keyValues) {
+        // Caller makes sure that keyValues not empty
+        assert !keyValues.isEmpty();
+        this.lock.writeLock().lock();
+        try {
+            // create a new snapshot and let the old one go.
+            assert this.snapshot == keyValues;
+            this.snapshot = EntrySkipList.EMPTY_VALUE;
+        } finally {
+            this.lock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * Throttling writer w/ 1 ms delay
+     */
+    private void throttleWriters() {
+        try {
+            Thread.sleep(1);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    /**
+    * Write an update
+    * @param entry
+    * @return approximate size of the passed key and value.
+    */
+    public long addEntry(long ledgerId, long entryId, final ByteBuffer entry, final CacheCallback cb)
+            throws IOException {
+        long size = 0;
+        if (isSizeLimitReached()) {
+            Checkpoint cp = snapshot();
+            if (null != cp) {
+                cb.onSizeLimitReached();
+            } else {
+                throttleWriters();
+            }
+        }
+
+        this.lock.readLock().lock();
+        try {
+            EntryKeyValue toAdd = cloneWithAllocator(ledgerId, entryId, entry);
+            size = internalAdd(toAdd);
+        } finally {
+            this.lock.readLock().unlock();
+        }
+
+        return size;
+    }
+
+    /**
+    * Internal version of add() that doesn't clone KVs with the
+    * allocator, and doesn't take the lock.
+    *
+    * Callers should ensure they already have the read lock taken
+    */
+    private long internalAdd(final EntryKeyValue toAdd) throws IOException {
+        long sizeChange = 0;
+        if (kvmap.putIfAbsent(toAdd, toAdd) == null) {
+            sizeChange = toAdd.getLength();
+            size.addAndGet(sizeChange);
+        }
+        return sizeChange;
+    }
+
+    private EntryKeyValue newEntry(long ledgerId, long entryId, final ByteBuffer entry) {
+        byte[] buf;
+        int offset = 0;
+        int length = entry.remaining();
+
+        if (entry.hasArray()) {
+            buf = entry.array();
+            offset = entry.arrayOffset();
+        }
+        else {
+            buf = new byte[length];
+            entry.get(buf);
+        }
+        return new EntryKeyValue(ledgerId, entryId, buf, offset, length);
+    }
+
+    private EntryKeyValue cloneWithAllocator(long ledgerId, long entryId, final ByteBuffer entry) {
+        int len = entry.remaining();
+        SkipListArena.MemorySlice alloc = allocator.allocateBytes(len);
+        if (alloc == null) {
+            // The allocation was too large, allocator decided
+            // not to do anything with it.
+            return newEntry(ledgerId, entryId, entry);
+        }
+
+        assert alloc.getData() != null;
+        entry.get(alloc.getData(), alloc.getOffset(), len);
+        return new EntryKeyValue(ledgerId, entryId, alloc.getData(), alloc.getOffset(), len);
+    }
+
+    /**
+     * Find the entry with given key
+     * @param ledgerId
+     * @param entryId
+     * @return the entry kv or null if none found.
+     */
+    public EntryKeyValue getEntry(long ledgerId, long entryId) throws IOException {
+        EntryKey key = new EntryKey(ledgerId, entryId);
+        EntryKeyValue value = null;
+        this.lock.readLock().lock();
+        try {
+            value = this.kvmap.get(key);
+            if (value == null) {
+                value = this.snapshot.get(key);
+            }
+        } finally {
+            this.lock.readLock().unlock();
+        }
+
+        return value;
+    }
+
+    /**
+     * Find the last entry with the given ledger key
+     * @param ledgerId
+     * @return the entry kv or null if none found.
+     */
+    public EntryKeyValue getLastEntry(long ledgerId) throws IOException {
+        EntryKey result = null;
+        EntryKey key = new EntryKey(ledgerId, Long.MAX_VALUE);
+        this.lock.readLock().lock();
+        try {
+            result = this.kvmap.floorKey(key);
+            if (result == null || result.getLedgerId() != ledgerId) {
+                result = this.snapshot.floorKey(key);
+            }
+        } finally {
+            this.lock.readLock().unlock();
+        }
+
+        if (result == null || result.getLedgerId() != ledgerId) {
+            return null;
+        }
+        return (EntryKeyValue)result;
+    }
+
+    /**
+     * Check if the entire heap usage for this EntryMemTable exceeds limit
+     */
+    boolean isSizeLimitReached() {
+        return size.get() >= skipListSizeLimit;
+    }
+
+    /**
+     * Check if there is data in the mem-table
+     * @return
+     */
+    boolean isEmpty() {
+        return size.get() == 0 && snapshot.isEmpty();
+    }
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java?rev=1576883&r1=1576882&r2=1576883&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java Wed Mar 12 20:26:58 2014
@@ -46,22 +46,22 @@ class InterleavedLedgerStorage implement
     private final static Logger LOG = LoggerFactory.getLogger(InterleavedLedgerStorage.class);
 
     // Hold the last checkpoint
-    static class CheckpointHolder {
+    protected static class CheckpointHolder {
         Checkpoint lastCheckpoint = Checkpoint.MAX;
 
-        synchronized void setNextCheckpoint(Checkpoint cp) {
+        protected synchronized void setNextCheckpoint(Checkpoint cp) {
             if (Checkpoint.MAX.equals(lastCheckpoint) || lastCheckpoint.compareTo(cp) < 0) {
                 lastCheckpoint = cp;
             }
         }
 
-        synchronized void clearLastCheckpoint(Checkpoint done) {
+        protected synchronized void clearLastCheckpoint(Checkpoint done) {
             if (0 == lastCheckpoint.compareTo(done)) {
                 lastCheckpoint = Checkpoint.MAX;
             }
         }
 
-        synchronized Checkpoint getLastCheckpoint() {
+        protected synchronized Checkpoint getLastCheckpoint() {
             return lastCheckpoint;
         }
     }
@@ -69,7 +69,7 @@ class InterleavedLedgerStorage implement
     EntryLogger entryLogger;
     LedgerCache ledgerCache;
     private final CheckpointSource checkpointSource;
-    private final CheckpointHolder checkpointHolder = new CheckpointHolder();
+    protected final CheckpointHolder checkpointHolder = new CheckpointHolder();
 
     // A sorted map to stored all active ledger ids
     protected final SnapshotMap<Long, Boolean> activeLedgers;

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SkipListArena.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SkipListArena.java?rev=1576883&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SkipListArena.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SkipListArena.java Wed Mar 12 20:26:58 2014
@@ -0,0 +1,238 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.bookkeeper.conf.ServerConfiguration;
+
+/**
+ * SkipList allocation buffer to reduce memory fragment.
+ * Adapted from HBase project.
+ * <p>
+ * The SkipListArena is basically a bump-the-pointer allocator that allocates
+ * big (default 2MB) byte[] chunks from and then handles it out to threads that
+ * request slices into the array.
+ * <p>
+ * The purpose of this class is to combat heap fragmentation in the
+ * bookie. By ensuring that all KeyValues in a given SkipList refer
+ * only to large chunks of contiguous memory, we ensure that large blocks
+ * get freed up when the SkipList is flushed.
+ * <p>
+ * Without the Arena, the byte array allocated during insertion end up
+ * interleaved throughout the heap, and the old generation gets progressively
+ * more fragmented until a stop-the-world compacting collection occurs.
+ * <p>
+ */
+public class SkipListArena {
+    private AtomicReference<Chunk> curChunk = new AtomicReference<Chunk>();
+
+    final int chunkSize;
+
+    final int maxAlloc;
+
+    public SkipListArena(ServerConfiguration cfg) {
+        chunkSize = cfg.getSkipListArenaChunkSize();
+        maxAlloc = cfg.getSkipListArenaMaxAllocSize();
+        assert maxAlloc <= chunkSize;
+    }
+
+    /**
+    * Allocate a slice of the given length.
+    *
+    * If the size is larger than the maximum size specified for this
+    * allocator, returns null.
+    */
+    public MemorySlice allocateBytes(int size) {
+        assert size >= 0;
+
+        // Callers should satisfy large allocations directly from JVM since they
+        // don't cause fragmentation as badly.
+        if (size > maxAlloc) {
+            return null;
+        }
+
+        while (true) {
+            Chunk c = getCurrentChunk();
+
+            // Try to allocate from this chunk
+            int allocOffset = c.alloc(size);
+            if (allocOffset != -1) {
+                // We succeeded - this is the common case - small alloc
+                // from a big buffer
+                return new MemorySlice(c.data, allocOffset);
+            }
+
+            // not enough space!
+            // try to retire this chunk
+            retireCurrentChunk(c);
+        }
+    }
+
+    /**
+    * Try to retire the current chunk if it is still there.
+    */
+    private void retireCurrentChunk(Chunk c) {
+        curChunk.compareAndSet(c, null);
+        // If the CAS fails, that means that someone else already
+        // retired the chunk for us.
+    }
+
+    /**
+    * Get the current chunk, or, if there is no current chunk,
+    * allocate a new one from the JVM.
+    */
+    private Chunk getCurrentChunk() {
+        while (true) {
+            // Try to get the chunk
+            Chunk c = curChunk.get();
+            if (c != null) {
+                return c;
+            }
+
+            // No current chunk, so we want to allocate one. We race
+            // against other allocators to CAS in an uninitialized chunk
+            // (which is cheap to allocate)
+            c = new Chunk(chunkSize);
+            if (curChunk.compareAndSet(null, c)) {
+                c.init();
+                return c;
+            }
+            // lost race
+        }
+    }
+
+    /**
+    * A chunk of memory out of which allocations are sliced.
+    */
+    private static class Chunk {
+        /** Actual underlying data */
+        private byte[] data;
+
+        private static final int UNINITIALIZED = -1;
+        private static final int OOM = -2;
+        /**
+         * Offset for the next allocation, or the sentinel value -1
+         * which implies that the chunk is still uninitialized.
+         * */
+        private AtomicInteger nextFreeOffset = new AtomicInteger(UNINITIALIZED);
+
+        /** Total number of allocations satisfied from this buffer */
+        private AtomicInteger allocCount = new AtomicInteger();
+
+        /** Size of chunk in bytes */
+        private final int size;
+
+        /**
+         * Create an uninitialized chunk. Note that memory is not allocated yet, so
+         * this is cheap.
+         * @param size in bytes
+         */
+        private Chunk(int size) {
+            this.size = size;
+        }
+
+        /**
+         * Actually claim the memory for this chunk. This should only be called from
+         * the thread that constructed the chunk. It is thread-safe against other
+         * threads calling alloc(), who will block until the allocation is complete.
+         */
+        public void init() {
+            assert nextFreeOffset.get() == UNINITIALIZED;
+            try {
+                data = new byte[size];
+            } catch (OutOfMemoryError e) {
+                boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM);
+                assert failInit; // should be true.
+                throw e;
+            }
+            // Mark that it's ready for use
+            boolean okInit = nextFreeOffset.compareAndSet(UNINITIALIZED, 0);
+            assert okInit;    // single-threaded call
+        }
+
+        /**
+         * Try to allocate <code>size</code> bytes from the chunk.
+         * @return the offset of the successful allocation, or -1 to indicate not-enough-space
+         */
+        public int alloc(int size) {
+            while (true) {
+                int oldOffset = nextFreeOffset.get();
+                if (oldOffset == UNINITIALIZED) {
+                    // Other thread allocating it right now
+                    Thread.yield();
+                    continue;
+                }
+                if (oldOffset == OOM) {
+                    return -1;
+                }
+
+                if (oldOffset + size > data.length) {
+                    return -1; // alloc doesn't fit
+                }
+
+                // Try to atomically claim this chunk
+                if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size)) {
+                    // we got the alloc
+                    allocCount.incrementAndGet();
+                    return oldOffset;
+                }
+                // lost race
+            }
+        }
+
+        @Override
+        public String toString() {
+            return "Chunk@" + System.identityHashCode(this) +
+                ": used(" + allocCount.get() + "), free(" +
+                (data.length - nextFreeOffset.get() + ")");
+        }
+    }
+
+    /**
+    * The result of a single allocation. Contains the chunk that the
+    * allocation points into, and the offset in this array where the
+    * slice begins.
+    */
+    public static class MemorySlice {
+        private final byte[] data;
+        private final int offset;
+
+        private MemorySlice(byte[] data, int off) {
+            this.data = data;
+            this.offset = off;
+        }
+
+        @Override
+        public String toString() {
+            return "Slice:" + "capacity(" + data.length + "), offset(" + offset + ")";
+        }
+
+        byte[] getData() {
+            return data;
+        }
+
+        int getOffset() {
+            return offset;
+        }
+    }
+}

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SkipListFlusher.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SkipListFlusher.java?rev=1576883&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SkipListFlusher.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SkipListFlusher.java Wed Mar 12 20:26:58 2014
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Flush entries from skip list
+ */
+public interface SkipListFlusher {
+    /**
+     * Process an entry.
+     *
+     * @param ledgerId
+     *          Ledger ID.
+     * @param entryId
+     *          The entry id this entry.
+     * @param entry
+     *          Entry ByteBuffer
+     * @throws IOException
+     */
+    public void process(long ledgerId, long entryId, ByteBuffer entry) throws IOException;
+}

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java?rev=1576883&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java Wed Mar 12 20:26:58 2014
@@ -0,0 +1,185 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.proto.BookieProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SortedLedgerStorage extends InterleavedLedgerStorage
+        implements LedgerStorage, CacheCallback, SkipListFlusher {
+    private final static Logger LOG = LoggerFactory.getLogger(SortedLedgerStorage.class);
+
+    private final EntryMemTable memTable;
+    private final ScheduledExecutorService scheduler;
+
+    public SortedLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager,
+                               LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager,
+                               final CheckpointSource checkpointSource)
+                                       throws IOException {
+        super(conf, ledgerManager, ledgerDirsManager, indexDirsManager, null);
+        this.memTable = new EntryMemTable(conf, checkpointSource);
+        this.scheduler = Executors.newSingleThreadScheduledExecutor(
+                new ThreadFactoryBuilder()
+                .setNameFormat("SortedLedgerStorage-%d")
+                .setPriority((Thread.NORM_PRIORITY + Thread.MAX_PRIORITY)/2).build());
+    }
+
+    @Override
+    public void start() {
+        try {
+            flush();
+        } catch (IOException e) {
+            LOG.error("Exception thrown while flushing ledger cache.", e);
+        }
+        super.start();
+    }
+
+    @Override
+    public void shutdown() throws InterruptedException {
+        // Wait for any jobs currently scheduled to be completed and then shut down.
+        scheduler.shutdown();
+        if (!scheduler.awaitTermination(3, TimeUnit.SECONDS)) {
+            scheduler.shutdownNow();
+        }
+        super.shutdown();
+    }
+
+    @Override
+    public boolean ledgerExists(long ledgerId) throws IOException {
+        // Done this way because checking the skip list is an O(logN) operation compared to
+        // the O(1) for the ledgerCache.
+        if (!super.ledgerExists(ledgerId)) {
+            EntryKeyValue kv = memTable.getLastEntry(ledgerId);
+            if (null == kv) {
+                return super.ledgerExists(ledgerId);
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public long addEntry(ByteBuffer entry) throws IOException {
+        long ledgerId = entry.getLong();
+        long entryId = entry.getLong();
+        entry.rewind();
+        memTable.addEntry(ledgerId, entryId, entry, this);
+        return entryId;
+    }
+
+    /**
+     * Get the last entry id for a particular ledger.
+     * @param ledgerId
+     * @return
+     */
+    private ByteBuffer getLastEntryId(long ledgerId) throws IOException {
+        EntryKeyValue kv = memTable.getLastEntry(ledgerId);
+        if (null != kv) {
+            return kv.getValueAsByteBuffer();
+        }
+        // If it doesn't exist in the skip list, then fallback to the ledger cache+index.
+        return super.getEntry(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED);
+    }
+
+    @Override
+    public ByteBuffer getEntry(long ledgerId, long entryId) throws IOException {
+        if (entryId == BookieProtocol.LAST_ADD_CONFIRMED) {
+            return getLastEntryId(ledgerId);
+        }
+        ByteBuffer buffToRet;
+        try {
+            buffToRet = super.getEntry(ledgerId, entryId);
+        } catch (Bookie.NoEntryException nee) {
+            EntryKeyValue kv = memTable.getEntry(ledgerId, entryId);
+            if (null == kv) {
+                // The entry might have been flushed since we last checked, so query the ledger cache again.
+                // If the entry truly doesn't exist, then this will throw a NoEntryException
+                buffToRet = super.getEntry(ledgerId, entryId);
+            } else {
+                buffToRet = kv.getValueAsByteBuffer();
+            }
+        }
+        // buffToRet will not be null when we reach here.
+        return buffToRet;
+    }
+
+    @Override
+    public Checkpoint checkpoint(final Checkpoint checkpoint) throws IOException {
+        Checkpoint lastCheckpoint = checkpointHolder.getLastCheckpoint();
+        // if checkpoint is less than last checkpoint, we don't need to do checkpoint again.
+        if (lastCheckpoint.compareTo(checkpoint) > 0) {
+            return lastCheckpoint;
+        }
+        memTable.flush(this, checkpoint);
+        return super.checkpoint(checkpoint);
+    }
+
+    @Override
+    public void process(long ledgerId, long entryId,
+                        ByteBuffer buffer) throws IOException {
+        processEntry(ledgerId, entryId, buffer, false);
+    }
+
+    @Override
+    public void flush() throws IOException {
+        memTable.flush(this, Checkpoint.MAX);
+        super.flush();
+    }
+
+    // CacheCallback functions.
+    @Override
+    public void onSizeLimitReached() throws IOException {
+        // when size limit reached, we get the previous checkpoint from snapshot mem-table.
+        // at this point, we are safer to schedule a checkpoint, since the entries added before
+        // this checkpoint already written to entry logger.
+        // but it would be better not to let mem-table flush to different entry log files,
+        // so we roll entry log files in SortedLedgerStorage itself.
+        // After that, we could make the process writing data to entry logger file not bound with checkpoint.
+        // otherwise, it hurts add performance.
+        scheduler.submit(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    LOG.info("Started flushing mem table.");
+                    memTable.flush(SortedLedgerStorage.this);
+                    if (entryLogger.reachEntryLogLimit(0)) {
+                        entryLogger.rollLog();
+                        LOG.info("Rolling entry logger since it reached size limitation");
+                    }
+                } catch (IOException e) {
+                    // TODO: if we failed to flush data, we should switch the bookie back to readonly mode
+                    //       or shutdown it.
+                    LOG.error("Exception thrown while flushing skip list cache.", e);
+                }
+            }
+        });
+    }
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java?rev=1576883&r1=1576882&r2=1576883&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java Wed Mar 12 20:26:58 2014
@@ -21,11 +21,9 @@ import java.io.File;
 import java.util.List;
 
 import com.google.common.annotations.Beta;
-
+import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.lang.StringUtils;
 
-import com.google.common.annotations.Beta;
-
 /**
  * Configuration manages server-side settings
  */
@@ -95,6 +93,11 @@ public class ServerConfiguration extends
     protected final static String READ_BUFFER_SIZE = "readBufferSizeBytes";
     protected final static String WRITE_BUFFER_SIZE = "writeBufferSizeBytes";
 
+    protected final static String SORTED_LEDGER_STORAGE_ENABLED = "sortedLedgerStorageEnabled";
+    protected final static String SKIP_LIST_SIZE_LIMIT = "skipListSizeLimit";
+    protected final static String SKIP_LIST_CHUNK_SIZE_ENTRY = "skipListArenaChunkSize";
+    protected final static String SKIP_LIST_MAX_ALLOC_ENTRY = "skipListArenaMaxAllocSize";
+
     /**
      * Construct a default configuration object
      */
@@ -732,7 +735,7 @@ public class ServerConfiguration extends
      * that ledger.
      *
      * @see org.apache.bookkeeper.client.BookKeeper#openLedger
-     * 
+     *
      * @param waitTime time to wait before replicating ledger fragment
      */
     public void setOpenLedgerRereplicationGracePeriod(String waitTime) {
@@ -743,7 +746,7 @@ public class ServerConfiguration extends
      * Get the grace period which the rereplication worker to wait before
      * fencing and rereplicating a ledger fragment which is still being written
      * to, on bookie failure.
-     * 
+     *
      * @return long
      */
     public long getOpenLedgerRereplicationGracePeriod() {
@@ -856,6 +859,76 @@ public class ServerConfiguration extends
         return getInt(NUM_JOURNAL_CALLBACK_THREADS, 1);
     }
 
+    /**
+     * Set sorted-ledger storage enabled or not
+     *
+     * @param enabled
+     */
+    public ServerConfiguration setSortedLedgerStorageEnabled(boolean enabled) {
+        this.setProperty(SORTED_LEDGER_STORAGE_ENABLED, enabled);
+        return this;
+    }
+
+    /**
+     * Check if sorted-ledger storage enabled (default true)
+     *
+     * @return
+     */
+    public boolean getSortedLedgerStorageEnabled() {
+        return this.getBoolean(SORTED_LEDGER_STORAGE_ENABLED, true);
+    }
+
+    /**
+     * Get skip list data size limitation (default 64MB)
+     *
+     * @return skip list data size limitation
+     */
+    public long getSkipListSizeLimit() {
+        return this.getLong(SKIP_LIST_SIZE_LIMIT, 64 * 1024 * 1024L);
+    }
+
+    /**
+     * Set skip list size limit.
+     *
+     * @param size skip list size limit.
+     * @return server configuration object.
+     */
+    public ServerConfiguration setSkipListSizeLimit(int size) {
+        setProperty(SKIP_LIST_SIZE_LIMIT, size);
+        return this;
+    }
+
+    /**
+     * Get the number of bytes we should use as chunk allocation for the {@link
+     * org.apache.bookkeeper.bookie.SkipListArena}
+     * Default is 4 MB
+     * @return
+     */
+    public int getSkipListArenaChunkSize() {
+        return getInt(SKIP_LIST_CHUNK_SIZE_ENTRY, 4096 * 1024);
+    }
+
+    /**
+     * Set the number of bytes w used as chunk allocation for {@link
+     * org.apache.bookkeeper.bookie.SkipListArena}.
+     *
+     * @param size chunk size.
+     * @return server configuration object.
+     */
+    public ServerConfiguration setSkipListArenaChunkSize(int size) {
+        setProperty(SKIP_LIST_CHUNK_SIZE_ENTRY, size);
+        return this;
+    }
+
+    /**
+     * Get the max size we should delegate memory allocation to VM for the {@link
+     * org.apache.bookkeeper.bookie.SkipListArena}
+     * Default is 128 KB
+     * @return
+     */
+    public int getSkipListArenaMaxAllocSize() {
+        return getInt(SKIP_LIST_MAX_ALLOC_ENTRY, 128 * 1024);
+    }
 
     /**
      * Should we group journal force writes
@@ -940,10 +1013,10 @@ public class ServerConfiguration extends
      * Set whether the bookie is able to go into read-only mode.
      * If this is set to false, the bookie will shutdown on encountering
      * an error condition.
-     * 
+     *
      * @param enabled whether to enable read-only mode.
-     * 
-     * @return ServerConfiguration 
+     *
+     * @return ServerConfiguration
      */
     public ServerConfiguration setReadOnlyModeEnabled(boolean enabled) {
         setProperty(READ_ONLY_MODE_ENABLED, enabled);
@@ -952,7 +1025,7 @@ public class ServerConfiguration extends
 
     /**
      * Get whether read-only mode is enabled. The default is false.
-     * 
+     *
      * @return boolean
      */
     public boolean isReadOnlyModeEnabled() {
@@ -983,9 +1056,9 @@ public class ServerConfiguration extends
     /**
      * Set the Disk free space threshold as a fraction of the total
      * after which disk will be considered as full during disk check.
-     * 
+     *
      * @param threshold threshold to declare a disk full
-     * 
+     *
      * @return ServerConfiguration
      */
     public ServerConfiguration setDiskUsageThreshold(float threshold) {
@@ -995,7 +1068,7 @@ public class ServerConfiguration extends
 
     /**
      * Returns disk free space threshold. By default it is 0.95.
-     * 
+     *
      * @return float
      */
     public float getDiskUsageThreshold() {
@@ -1004,9 +1077,9 @@ public class ServerConfiguration extends
 
     /**
      * Set the disk checker interval to monitor ledger disk space
-     * 
+     *
      * @param interval interval between disk checks for space.
-     * 
+     *
      * @return ServerConfiguration
      */
     public ServerConfiguration setDiskCheckInterval(int interval) {
@@ -1016,7 +1089,7 @@ public class ServerConfiguration extends
 
     /**
      * Get the disk checker interval
-     * 
+     *
      * @return int
      */
     public int getDiskCheckInterval() {
@@ -1140,4 +1213,14 @@ public class ServerConfiguration extends
         setProperty(JOURNAL_REMOVE_FROM_PAGE_CACHE, enabled);
         return this;
     }
+
+    /**
+     * Validate the configuration.
+     * @throws ConfigurationException
+     */
+    public void validate() throws ConfigurationException {
+        if (getSkipListArenaChunkSize() < getSkipListArenaMaxAllocSize()) {
+            throw new ConfigurationException("Arena max allocation size should be smaller than the chunk size.");
+        }
+    }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java?rev=1576883&r1=1576882&r2=1576883&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java Wed Mar 12 20:26:58 2014
@@ -262,6 +262,7 @@ public class BookieServer {
         throws IllegalArgumentException {
         try {
             conf.loadConf(new File(confFile).toURI().toURL());
+            conf.validate();
         } catch (MalformedURLException e) {
             LOG.error("Could not open configuration file: " + confFile, e);
             throw new IllegalArgumentException();

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/resources/findbugsExclude.xml
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/resources/findbugsExclude.xml?rev=1576883&r1=1576882&r2=1576883&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/resources/findbugsExclude.xml (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/resources/findbugsExclude.xml Wed Mar 12 20:26:58 2014
@@ -20,4 +20,17 @@
     <!-- generated code, we can't be held responsible for findbugs in it //-->
     <Class name="~org\.apache\.bookkeeper\.proto\.DataFormats.*" />
   </Match>
+  <Match>
+    <!-- it is safe to store external bytes reference here. since we are using
+         bytes from a slab. //-->
+    <Class name="org.apache.bookkeeper.bookie.EntryKeyValue" />
+    <Bug pattern="EI_EXPOSE_REP2" />
+  </Match>
+  <Match>
+    <!-- it is safe to store external bytes reference here. since we are using
+         bytes from a slab. //-->
+    <Class name="org.apache.bookkeeper.bookie.EntryKeyValue" />
+    <Method name="getBuffer" />
+    <Bug pattern="EI_EXPOSE_REP" />
+  </Match>
 </FindBugsFilter>

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java?rev=1576883&r1=1576882&r2=1576883&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java Wed Mar 12 20:26:58 2014
@@ -102,6 +102,7 @@ public class CompactionTest extends Book
         baseConf.setMinorCompactionInterval(minorCompactionInterval);
         baseConf.setMajorCompactionInterval(majorCompactionInterval);
         baseConf.setEntryLogFilePreAllocationEnabled(false);
+        baseConf.setSortedLedgerStorageEnabled(false);
 
         super.setUp();
     }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java?rev=1576883&r1=1576882&r2=1576883&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java Wed Mar 12 20:26:58 2014
@@ -211,12 +211,9 @@ public class EntryLogTest extends TestCa
         bookie.getLedgerDirsManager().addToFilledDirs(entryLogger.currentDir);
         ledgerStorage.addEntry(generateEntry(3, 1));
         // Verify written entries
-        Assert.assertArrayEquals(generateEntry(1, 1).array(), ledgerStorage
-                .getEntry(1, 1).array());
-        Assert.assertArrayEquals(generateEntry(2, 1).array(), ledgerStorage
-                .getEntry(2, 1).array());
-        Assert.assertArrayEquals(generateEntry(3, 1).array(), ledgerStorage
-                .getEntry(3, 1).array());
+        Assert.assertTrue(0 == generateEntry(1, 1).compareTo(ledgerStorage.getEntry(1, 1)));
+        Assert.assertTrue(0 == generateEntry(2, 1).compareTo(ledgerStorage.getEntry(2, 1)));
+        Assert.assertTrue(0 == generateEntry(3, 1).compareTo(ledgerStorage.getEntry(3, 1)));
     }
 
     @After

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java?rev=1576883&r1=1576882&r2=1576883&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java Wed Mar 12 20:26:58 2014
@@ -220,7 +220,7 @@ public class LedgerCacheTest extends Tes
 
             // flush all first to clean previous dirty ledgers
             ledgerCache.flushLedger(true);
-            // flush all 
+            // flush all
             ledgerCache.flushLedger(true);
 
             // delete serveral ledgers
@@ -320,7 +320,8 @@ public class LedgerCacheTest extends Tes
             .setJournalDirName(journalDir.getPath())
             .setLedgerDirNames(new String[] { ledgerDir.getPath() })
             .setFlushInterval(1000)
-            .setPageLimit(1);
+            .setPageLimit(1)
+            .setSortedLedgerStorageEnabled(false);
 
         Bookie b = new Bookie(conf);
         b.start();

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java?rev=1576883&r1=1576882&r2=1576883&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java Wed Mar 12 20:26:58 2014
@@ -58,6 +58,7 @@ public class LedgerDeleteTest extends Mu
         baseConf.setEntryLogSizeLimit(2 * 1024 * 1024L);
         baseConf.setGcWaitTime(1000);
         baseConf.setEntryLogFilePreAllocationEnabled(false);
+        baseConf.setSortedLedgerStorageEnabled(false);
         super.setUp();
     }
 

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java?rev=1576883&r1=1576882&r2=1576883&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java Wed Mar 12 20:26:58 2014
@@ -39,6 +39,7 @@ public class ReadOnlyBookieTest extends 
 
     public ReadOnlyBookieTest() {
         super(2);
+        baseConf.setSortedLedgerStorageEnabled(false);
         baseConf.setEntryLogFilePreAllocationEnabled(false);
     }
 



Mime
View raw message