Author: ivank
Date: Wed Mar 12 20:26:58 2014
New Revision: 1576883
URL: http://svn.apache.org/r1576883
Log:
BOOKKEEPER-432: Improve performance of entry log range read per ledger entries (yixue, sijie via ivank)
Added:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CacheCallback.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryKeyValue.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SkipListArena.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SkipListFlusher.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/resources/findbugsExclude.xml
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1576883&r1=1576882&r2=1576883&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Wed Mar 12 20:26:58 2014
@@ -176,6 +176,8 @@ Trunk (unreleased changes)
BOOKKEEPER-740: AutoRecoveryMainTest#testAutoRecoverySessionLoss is failing (Rakesh via sijie)
+ BOOKKEEPER-432: Improve performance of entry log range read per ledger entries (yixue, sijie via ivank)
+
hedwig-server:
BOOKKEEPER-601: readahead cache size isn't updated correctly (sijie via fpj)
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java?rev=1576883&r1=1576882&r2=1576883&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java Wed Mar 12 20:26:58 2014
@@ -437,9 +437,16 @@ public class Bookie extends BookieCritic
this.ledgerDirsManager.init();
// instantiate the journal
journal = new Journal(conf, ledgerDirsManager);
- ledgerStorage = new InterleavedLedgerStorage(conf, ledgerManager,
- ledgerDirsManager, indexDirsManager,
- journal);
+ // Check the type of storage.
+ if (conf.getSortedLedgerStorageEnabled()) {
+ ledgerStorage = new SortedLedgerStorage(conf, ledgerManager,
+ ledgerDirsManager, indexDirsManager,
+ journal);
+ } else {
+ ledgerStorage = new InterleavedLedgerStorage(conf, ledgerManager,
+ ledgerDirsManager, indexDirsManager,
+ journal);
+ }
syncThread = new SyncThread(conf, getLedgerDirsListener(),
ledgerStorage, journal);
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CacheCallback.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CacheCallback.java?rev=1576883&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CacheCallback.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CacheCallback.java Wed Mar 12 20:26:58 2014
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.IOException;
+
+/**
+ * Interface plugged into caching to receive callback notifications
+ */
+public interface CacheCallback {
+ /**
+ * Process notification that cache size limit reached.
+ */
+ public void onSizeLimitReached() throws IOException;
+}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryKeyValue.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryKeyValue.java?rev=1576883&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryKeyValue.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryKeyValue.java Wed Mar 12 20:26:58 2014
@@ -0,0 +1,129 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.bookie;
+
+import java.nio.ByteBuffer;
+
+/**
+ * An entry Key/Value.
+ * EntryKeyValue wraps a byte array and takes offsets and lengths into the array to
+ * interpret the content as entry blob.
+ */
+public class EntryKeyValue extends EntryKey {
+ final private byte [] bytes;
+ private int offset = 0; // start offset of entry blob
+ private int length = 0; // length of entry blob
+
+ /**
+ * @return The byte array backing this EntryKeyValue.
+ */
+ public byte [] getBuffer() {
+ return this.bytes;
+ }
+
+ /**
+ * @return Offset into {@link #getBuffer()} at which the EntryKeyValue starts.
+ */
+ public int getOffset() {
+ return this.offset;
+ }
+
+ /**
+ * @return Length of bytes this EntryKeyValue occupies in {@link #getBuffer()}.
+ */
+ public int getLength() {
+ return this.length;
+ }
+
+ /**
+ *
+ * Creates a EntryKeyValue from the start of the specified byte array.
+ * Presumes <code>bytes</code> content contains the value portion of a EntryKeyValue.
+ * @param bytes byte array
+ */
+ public EntryKeyValue(long ledgerId, long entryId, final byte [] bytes) {
+ this(ledgerId, entryId, bytes, 0, bytes.length);
+ }
+
+ /**
+ *
+ * Creates a EntryKeyValue from the start of the specified byte array.
+ * Presumes <code>bytes</code> content contains the value portion of a EntryKeyValue.
+ * @param bytes byte array
+ * @param offset offset in bytes as start of blob
+ * @param length of blob
+ */
+ public EntryKeyValue(long ledgerId, long entryId, final byte [] bytes, int offset, int length) {
+ super(ledgerId, entryId);
+ this.bytes = bytes;
+ this.offset = offset;
+ this.length = length;
+ }
+
+ /**
+ * Returns the blob wrapped in a new <code>ByteBuffer</code>.
+ *
+ * @return the value
+ */
+ public ByteBuffer getValueAsByteBuffer() {
+ return ByteBuffer.wrap(getBuffer(), getOffset(), getLength());
+ }
+
+ /**
+ * Write EntryKeyValue blob into the provided byte buffer.
+ *
+ * @param dst the bytes buffer to use
+ *
+ * @return The number of useful bytes in the buffer.
+ *
+ * @throws IllegalArgumentException an illegal value was passed or there is insufficient space
+ * remaining in the buffer
+ */
+ int writeToByteBuffer(ByteBuffer dst) {
+ if (dst.remaining() < getLength()) {
+ throw new IllegalArgumentException("Buffer size " + dst.remaining() + " < " + getLength());
+ }
+
+ dst.put(getBuffer(), getOffset(), getLength());
+ return getLength();
+ }
+
+ /**
+ * String representation
+ */
+ public String toString() {
+ return ledgerId + ":" + entryId;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ // since this entry is identified by (lid, eid)
+ // so just use {@link org.apache.bookkeeper.bookie.EntryKey#equals}.
+ return super.equals(other);
+ }
+
+ @Override
+ public int hashCode() {
+ // since this entry is identified by (lid, eid)
+ // so just use {@link org.apache.bookkeeper.bookie.EntryKey#hashCode} as the hash code.
+ return super.hashCode();
+ }
+
+}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java?rev=1576883&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java Wed Mar 12 20:26:58 2014
@@ -0,0 +1,380 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+
+/**
+ * The EntryMemTable holds in-memory representation to the entries not-yet flushed.
+ * When asked to flush, current EntrySkipList is moved to snapshot and is cleared.
+ * We continue to serve edits out of new EntrySkipList and backing snapshot until
+ * flusher reports in that the flush succeeded. At that point we let the snapshot go.
+ */
+public class EntryMemTable {
+ private static Logger Logger = LoggerFactory.getLogger(Journal.class);
+
+ /**
+ * Entry skip list
+ */
+ static class EntrySkipList extends ConcurrentSkipListMap<EntryKey, EntryKeyValue> {
+ final Checkpoint cp;
+ static final EntrySkipList EMPTY_VALUE = new EntrySkipList(Checkpoint.MAX) {
+ @Override
+ public boolean isEmpty() {
+ return true;
+ }
+ };
+
+ EntrySkipList(final Checkpoint cp) {
+ super(EntryKey.COMPARATOR);
+ this.cp = cp;
+ }
+
+ int compareTo(final Checkpoint cp) {
+ return this.cp.compareTo(cp);
+ }
+
+ @Override
+ public EntryKeyValue put(EntryKey k, EntryKeyValue v) {
+ return putIfAbsent(k, v);
+ }
+
+ @Override
+ public EntryKeyValue putIfAbsent(EntryKey k, EntryKeyValue v) {
+ assert k.equals(v);
+ return super.putIfAbsent(v, v);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return this == o;
+ }
+ }
+
+ volatile EntrySkipList kvmap;
+
+ // Snapshot of EntryMemTable. Made for flusher.
+ volatile EntrySkipList snapshot;
+
+ final ServerConfiguration conf;
+ final CheckpointSource checkpointSource;
+
+ final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+ // Used to track own data size
+ final AtomicLong size;
+
+ final long skipListSizeLimit;
+
+ SkipListArena allocator;
+
+ private EntrySkipList newSkipList() {
+ return new EntrySkipList(checkpointSource.newCheckpoint());
+ }
+
+ /**
+ * Constructor.
+ * @param conf Server configuration
+ */
+ public EntryMemTable(final ServerConfiguration conf, final CheckpointSource source) {
+ this.checkpointSource = source;
+ this.kvmap = newSkipList();
+ this.snapshot = EntrySkipList.EMPTY_VALUE;
+ this.conf = conf;
+ this.size = new AtomicLong(0);
+ this.allocator = new SkipListArena(conf);
+ // skip list size limit
+ this.skipListSizeLimit = conf.getSkipListSizeLimit();
+ }
+
+ void dump() {
+ for (EntryKey key: this.kvmap.keySet()) {
+ Logger.info(key.toString());
+ }
+ for (EntryKey key: this.snapshot.keySet()) {
+ Logger.info(key.toString());
+ }
+ }
+
+ Checkpoint snapshot() throws IOException {
+ return snapshot(Checkpoint.MAX);
+ }
+
+ /**
+ * Snapshot current EntryMemTable. if given <i>oldCp</i> is older than current checkpoint,
+ * we don't do any snapshot. If snapshot happened, we return the checkpoint of the snapshot.
+ *
+ * @param oldCp
+ * checkpoint
+ * @return checkpoint of the snapshot, null means no snapshot
+ * @throws IOException
+ */
+ Checkpoint snapshot(Checkpoint oldCp) throws IOException {
+ Checkpoint cp = null;
+ // No-op if snapshot currently has entries
+ if (this.snapshot.isEmpty() &&
+ this.kvmap.compareTo(oldCp) < 0) {
+ this.lock.writeLock().lock();
+ try {
+ if (this.snapshot.isEmpty() && !this.kvmap.isEmpty()
+ && this.kvmap.compareTo(oldCp) < 0) {
+ this.snapshot = this.kvmap;
+ this.kvmap = newSkipList();
+ // get the checkpoint of the memtable.
+ cp = this.kvmap.cp;
+ // Reset heap to not include any keys
+ this.size.set(0);
+ // Reset allocator so we get a fresh buffer for the new EntryMemTable
+ this.allocator = new SkipListArena(conf);
+ }
+ } finally {
+ this.lock.writeLock().unlock();
+ }
+ }
+ return cp;
+ }
+
+ /**
+ * Flush snapshot and clear it.
+ */
+ long flush(final SkipListFlusher flusher) throws IOException {
+ return flushSnapshot(flusher, Checkpoint.MAX);
+ }
+
+ /**
+ * Flush memtable until checkpoint.
+ *
+ * @param checkpoint
+ * all data before this checkpoint need to be flushed.
+ */
+ public long flush(SkipListFlusher flusher, Checkpoint checkpoint) throws IOException {
+ long size = flushSnapshot(flusher, checkpoint);
+ if (null != snapshot(checkpoint)) {
+ size += flushSnapshot(flusher, checkpoint);
+ }
+ return size;
+ }
+
+ /**
+ * Flush snapshot and clear it iff its data is before checkpoint.
+ * Only this function change non-empty this.snapshot.
+ */
+ private long flushSnapshot(final SkipListFlusher flusher, Checkpoint checkpoint) throws IOException {
+ long size = 0;
+ if (this.snapshot.compareTo(checkpoint) < 0) {
+ long ledger, ledgerGC = -1;
+ synchronized (this) {
+ EntrySkipList keyValues = this.snapshot;
+ if (keyValues.compareTo(checkpoint) < 0) {
+ for (EntryKey key : keyValues.keySet()) {
+ EntryKeyValue kv = (EntryKeyValue)key;
+ size += kv.getLength();
+ ledger = kv.getLedgerId();
+ if (ledgerGC != ledger) {
+ try {
+ flusher.process(ledger, kv.getEntryId(), kv.getValueAsByteBuffer());
+ } catch (NoLedgerException exception) {
+ ledgerGC = ledger;
+ }
+ }
+ }
+ clearSnapshot(keyValues);
+ }
+ }
+ }
+
+ return size;
+ }
+
+ /**
+ * The passed snapshot was successfully persisted; it can be let go.
+ * @param keyValues The snapshot to clean out.
+ * @see {@link #snapshot()}
+ */
+ private void clearSnapshot(final EntrySkipList keyValues) {
+ // Caller makes sure that keyValues not empty
+ assert !keyValues.isEmpty();
+ this.lock.writeLock().lock();
+ try {
+ // create a new snapshot and let the old one go.
+ assert this.snapshot == keyValues;
+ this.snapshot = EntrySkipList.EMPTY_VALUE;
+ } finally {
+ this.lock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Throttling writer w/ 1 ms delay
+ */
+ private void throttleWriters() {
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * Write an update
+ * @param entry
+ * @return approximate size of the passed key and value.
+ */
+ public long addEntry(long ledgerId, long entryId, final ByteBuffer entry, final CacheCallback cb)
+ throws IOException {
+ long size = 0;
+ if (isSizeLimitReached()) {
+ Checkpoint cp = snapshot();
+ if (null != cp) {
+ cb.onSizeLimitReached();
+ } else {
+ throttleWriters();
+ }
+ }
+
+ this.lock.readLock().lock();
+ try {
+ EntryKeyValue toAdd = cloneWithAllocator(ledgerId, entryId, entry);
+ size = internalAdd(toAdd);
+ } finally {
+ this.lock.readLock().unlock();
+ }
+
+ return size;
+ }
+
+ /**
+ * Internal version of add() that doesn't clone KVs with the
+ * allocator, and doesn't take the lock.
+ *
+ * Callers should ensure they already have the read lock taken
+ */
+ private long internalAdd(final EntryKeyValue toAdd) throws IOException {
+ long sizeChange = 0;
+ if (kvmap.putIfAbsent(toAdd, toAdd) == null) {
+ sizeChange = toAdd.getLength();
+ size.addAndGet(sizeChange);
+ }
+ return sizeChange;
+ }
+
+ private EntryKeyValue newEntry(long ledgerId, long entryId, final ByteBuffer entry) {
+ byte[] buf;
+ int offset = 0;
+ int length = entry.remaining();
+
+ if (entry.hasArray()) {
+ buf = entry.array();
+ offset = entry.arrayOffset();
+ }
+ else {
+ buf = new byte[length];
+ entry.get(buf);
+ }
+ return new EntryKeyValue(ledgerId, entryId, buf, offset, length);
+ }
+
+ private EntryKeyValue cloneWithAllocator(long ledgerId, long entryId, final ByteBuffer entry) {
+ int len = entry.remaining();
+ SkipListArena.MemorySlice alloc = allocator.allocateBytes(len);
+ if (alloc == null) {
+ // The allocation was too large, allocator decided
+ // not to do anything with it.
+ return newEntry(ledgerId, entryId, entry);
+ }
+
+ assert alloc.getData() != null;
+ entry.get(alloc.getData(), alloc.getOffset(), len);
+ return new EntryKeyValue(ledgerId, entryId, alloc.getData(), alloc.getOffset(), len);
+ }
+
+ /**
+ * Find the entry with given key
+ * @param ledgerId
+ * @param entryId
+ * @return the entry kv or null if none found.
+ */
+ public EntryKeyValue getEntry(long ledgerId, long entryId) throws IOException {
+ EntryKey key = new EntryKey(ledgerId, entryId);
+ EntryKeyValue value = null;
+ this.lock.readLock().lock();
+ try {
+ value = this.kvmap.get(key);
+ if (value == null) {
+ value = this.snapshot.get(key);
+ }
+ } finally {
+ this.lock.readLock().unlock();
+ }
+
+ return value;
+ }
+
+ /**
+ * Find the last entry with the given ledger key
+ * @param ledgerId
+ * @return the entry kv or null if none found.
+ */
+ public EntryKeyValue getLastEntry(long ledgerId) throws IOException {
+ EntryKey result = null;
+ EntryKey key = new EntryKey(ledgerId, Long.MAX_VALUE);
+ this.lock.readLock().lock();
+ try {
+ result = this.kvmap.floorKey(key);
+ if (result == null || result.getLedgerId() != ledgerId) {
+ result = this.snapshot.floorKey(key);
+ }
+ } finally {
+ this.lock.readLock().unlock();
+ }
+
+ if (result == null || result.getLedgerId() != ledgerId) {
+ return null;
+ }
+ return (EntryKeyValue)result;
+ }
+
+ /**
+ * Check if the entire heap usage for this EntryMemTable exceeds limit
+ */
+ boolean isSizeLimitReached() {
+ return size.get() >= skipListSizeLimit;
+ }
+
+ /**
+ * Check if there is data in the mem-table
+ * @return
+ */
+ boolean isEmpty() {
+ return size.get() == 0 && snapshot.isEmpty();
+ }
+}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java?rev=1576883&r1=1576882&r2=1576883&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java Wed Mar 12 20:26:58 2014
@@ -46,22 +46,22 @@ class InterleavedLedgerStorage implement
private final static Logger LOG = LoggerFactory.getLogger(InterleavedLedgerStorage.class);
// Hold the last checkpoint
- static class CheckpointHolder {
+ protected static class CheckpointHolder {
Checkpoint lastCheckpoint = Checkpoint.MAX;
- synchronized void setNextCheckpoint(Checkpoint cp) {
+ protected synchronized void setNextCheckpoint(Checkpoint cp) {
if (Checkpoint.MAX.equals(lastCheckpoint) || lastCheckpoint.compareTo(cp) < 0) {
lastCheckpoint = cp;
}
}
- synchronized void clearLastCheckpoint(Checkpoint done) {
+ protected synchronized void clearLastCheckpoint(Checkpoint done) {
if (0 == lastCheckpoint.compareTo(done)) {
lastCheckpoint = Checkpoint.MAX;
}
}
- synchronized Checkpoint getLastCheckpoint() {
+ protected synchronized Checkpoint getLastCheckpoint() {
return lastCheckpoint;
}
}
@@ -69,7 +69,7 @@ class InterleavedLedgerStorage implement
EntryLogger entryLogger;
LedgerCache ledgerCache;
private final CheckpointSource checkpointSource;
- private final CheckpointHolder checkpointHolder = new CheckpointHolder();
+ protected final CheckpointHolder checkpointHolder = new CheckpointHolder();
// A sorted map to stored all active ledger ids
protected final SnapshotMap<Long, Boolean> activeLedgers;
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SkipListArena.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SkipListArena.java?rev=1576883&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SkipListArena.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SkipListArena.java Wed Mar 12 20:26:58 2014
@@ -0,0 +1,238 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.bookkeeper.conf.ServerConfiguration;
+
+/**
+ * SkipList allocation buffer to reduce memory fragment.
+ * Adapted from HBase project.
+ * <p>
+ * The SkipListArena is basically a bump-the-pointer allocator that allocates
+ * big (default 2MB) byte[] chunks from and then handles it out to threads that
+ * request slices into the array.
+ * <p>
+ * The purpose of this class is to combat heap fragmentation in the
+ * bookie. By ensuring that all KeyValues in a given SkipList refer
+ * only to large chunks of contiguous memory, we ensure that large blocks
+ * get freed up when the SkipList is flushed.
+ * <p>
+ * Without the Arena, the byte array allocated during insertion end up
+ * interleaved throughout the heap, and the old generation gets progressively
+ * more fragmented until a stop-the-world compacting collection occurs.
+ * <p>
+ */
+public class SkipListArena {
+ private AtomicReference<Chunk> curChunk = new AtomicReference<Chunk>();
+
+ final int chunkSize;
+
+ final int maxAlloc;
+
+ public SkipListArena(ServerConfiguration cfg) {
+ chunkSize = cfg.getSkipListArenaChunkSize();
+ maxAlloc = cfg.getSkipListArenaMaxAllocSize();
+ assert maxAlloc <= chunkSize;
+ }
+
+ /**
+ * Allocate a slice of the given length.
+ *
+ * If the size is larger than the maximum size specified for this
+ * allocator, returns null.
+ */
+ public MemorySlice allocateBytes(int size) {
+ assert size >= 0;
+
+ // Callers should satisfy large allocations directly from JVM since they
+ // don't cause fragmentation as badly.
+ if (size > maxAlloc) {
+ return null;
+ }
+
+ while (true) {
+ Chunk c = getCurrentChunk();
+
+ // Try to allocate from this chunk
+ int allocOffset = c.alloc(size);
+ if (allocOffset != -1) {
+ // We succeeded - this is the common case - small alloc
+ // from a big buffer
+ return new MemorySlice(c.data, allocOffset);
+ }
+
+ // not enough space!
+ // try to retire this chunk
+ retireCurrentChunk(c);
+ }
+ }
+
+ /**
+ * Try to retire the current chunk if it is still there.
+ */
+ private void retireCurrentChunk(Chunk c) {
+ curChunk.compareAndSet(c, null);
+ // If the CAS fails, that means that someone else already
+ // retired the chunk for us.
+ }
+
+ /**
+ * Get the current chunk, or, if there is no current chunk,
+ * allocate a new one from the JVM.
+ */
+ private Chunk getCurrentChunk() {
+ while (true) {
+ // Try to get the chunk
+ Chunk c = curChunk.get();
+ if (c != null) {
+ return c;
+ }
+
+ // No current chunk, so we want to allocate one. We race
+ // against other allocators to CAS in an uninitialized chunk
+ // (which is cheap to allocate)
+ c = new Chunk(chunkSize);
+ if (curChunk.compareAndSet(null, c)) {
+ c.init();
+ return c;
+ }
+ // lost race
+ }
+ }
+
+ /**
+ * A chunk of memory out of which allocations are sliced.
+ */
+ private static class Chunk {
+ /** Actual underlying data */
+ private byte[] data;
+
+ private static final int UNINITIALIZED = -1;
+ private static final int OOM = -2;
+ /**
+ * Offset for the next allocation, or the sentinel value -1
+ * which implies that the chunk is still uninitialized.
+ * */
+ private AtomicInteger nextFreeOffset = new AtomicInteger(UNINITIALIZED);
+
+ /** Total number of allocations satisfied from this buffer */
+ private AtomicInteger allocCount = new AtomicInteger();
+
+ /** Size of chunk in bytes */
+ private final int size;
+
+ /**
+ * Create an uninitialized chunk. Note that memory is not allocated yet, so
+ * this is cheap.
+ * @param size in bytes
+ */
+ private Chunk(int size) {
+ this.size = size;
+ }
+
+ /**
+ * Actually claim the memory for this chunk. This should only be called from
+ * the thread that constructed the chunk. It is thread-safe against other
+ * threads calling alloc(), who will block until the allocation is complete.
+ */
+ public void init() {
+ assert nextFreeOffset.get() == UNINITIALIZED;
+ try {
+ data = new byte[size];
+ } catch (OutOfMemoryError e) {
+ boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM);
+ assert failInit; // should be true.
+ throw e;
+ }
+ // Mark that it's ready for use
+ boolean okInit = nextFreeOffset.compareAndSet(UNINITIALIZED, 0);
+ assert okInit; // single-threaded call
+ }
+
+ /**
+ * Try to allocate <code>size</code> bytes from the chunk.
+ * @return the offset of the successful allocation, or -1 to indicate not-enough-space
+ */
+ public int alloc(int size) {
+ while (true) {
+ int oldOffset = nextFreeOffset.get();
+ if (oldOffset == UNINITIALIZED) {
+ // Other thread allocating it right now
+ Thread.yield();
+ continue;
+ }
+ if (oldOffset == OOM) {
+ return -1;
+ }
+
+ if (oldOffset + size > data.length) {
+ return -1; // alloc doesn't fit
+ }
+
+ // Try to atomically claim this chunk
+ if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size)) {
+ // we got the alloc
+ allocCount.incrementAndGet();
+ return oldOffset;
+ }
+ // lost race
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "Chunk@" + System.identityHashCode(this) +
+ ": used(" + allocCount.get() + "), free(" +
+ (data.length - nextFreeOffset.get() + ")");
+ }
+ }
+
+ /**
+ * The result of a single allocation. Contains the chunk that the
+ * allocation points into, and the offset in this array where the
+ * slice begins.
+ */
+ public static class MemorySlice {
+ private final byte[] data;
+ private final int offset;
+
+ private MemorySlice(byte[] data, int off) {
+ this.data = data;
+ this.offset = off;
+ }
+
+ @Override
+ public String toString() {
+ return "Slice:" + "capacity(" + data.length + "), offset(" + offset + ")";
+ }
+
+ byte[] getData() {
+ return data;
+ }
+
+ int getOffset() {
+ return offset;
+ }
+ }
+}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SkipListFlusher.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SkipListFlusher.java?rev=1576883&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SkipListFlusher.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SkipListFlusher.java Wed Mar 12 20:26:58 2014
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Flush entries from skip list
+ */
+public interface SkipListFlusher {
+ /**
+ * Process an entry.
+ *
+ * @param ledgerId
+ * Ledger ID.
+ * @param entryId
+ * The entry id this entry.
+ * @param entry
+ * Entry ByteBuffer
+ * @throws IOException
+ */
+ public void process(long ledgerId, long entryId, ByteBuffer entry) throws IOException;
+}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java?rev=1576883&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java Wed Mar 12 20:26:58 2014
@@ -0,0 +1,185 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.proto.BookieProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SortedLedgerStorage extends InterleavedLedgerStorage
+ implements LedgerStorage, CacheCallback, SkipListFlusher {
+ private final static Logger LOG = LoggerFactory.getLogger(SortedLedgerStorage.class);
+
+ private final EntryMemTable memTable;
+ private final ScheduledExecutorService scheduler;
+
+ public SortedLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager,
+ LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager,
+ final CheckpointSource checkpointSource)
+ throws IOException {
+ super(conf, ledgerManager, ledgerDirsManager, indexDirsManager, null);
+ this.memTable = new EntryMemTable(conf, checkpointSource);
+ this.scheduler = Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder()
+ .setNameFormat("SortedLedgerStorage-%d")
+ .setPriority((Thread.NORM_PRIORITY + Thread.MAX_PRIORITY)/2).build());
+ }
+
+ @Override
+ public void start() {
+ try {
+ flush();
+ } catch (IOException e) {
+ LOG.error("Exception thrown while flushing ledger cache.", e);
+ }
+ super.start();
+ }
+
+ @Override
+ public void shutdown() throws InterruptedException {
+ // Wait for any jobs currently scheduled to be completed and then shut down.
+ scheduler.shutdown();
+ if (!scheduler.awaitTermination(3, TimeUnit.SECONDS)) {
+ scheduler.shutdownNow();
+ }
+ super.shutdown();
+ }
+
+ @Override
+ public boolean ledgerExists(long ledgerId) throws IOException {
+ // Done this way because checking the skip list is an O(logN) operation compared to
+ // the O(1) for the ledgerCache.
+ if (!super.ledgerExists(ledgerId)) {
+ EntryKeyValue kv = memTable.getLastEntry(ledgerId);
+ if (null == kv) {
+ return super.ledgerExists(ledgerId);
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public long addEntry(ByteBuffer entry) throws IOException {
+ long ledgerId = entry.getLong();
+ long entryId = entry.getLong();
+ entry.rewind();
+ memTable.addEntry(ledgerId, entryId, entry, this);
+ return entryId;
+ }
+
+ /**
+ * Get the last entry id for a particular ledger.
+ * @param ledgerId
+ * @return
+ */
+ private ByteBuffer getLastEntryId(long ledgerId) throws IOException {
+ EntryKeyValue kv = memTable.getLastEntry(ledgerId);
+ if (null != kv) {
+ return kv.getValueAsByteBuffer();
+ }
+ // If it doesn't exist in the skip list, then fallback to the ledger cache+index.
+ return super.getEntry(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED);
+ }
+
+ @Override
+ public ByteBuffer getEntry(long ledgerId, long entryId) throws IOException {
+ if (entryId == BookieProtocol.LAST_ADD_CONFIRMED) {
+ return getLastEntryId(ledgerId);
+ }
+ ByteBuffer buffToRet;
+ try {
+ buffToRet = super.getEntry(ledgerId, entryId);
+ } catch (Bookie.NoEntryException nee) {
+ EntryKeyValue kv = memTable.getEntry(ledgerId, entryId);
+ if (null == kv) {
+ // The entry might have been flushed since we last checked, so query the ledger cache again.
+ // If the entry truly doesn't exist, then this will throw a NoEntryException
+ buffToRet = super.getEntry(ledgerId, entryId);
+ } else {
+ buffToRet = kv.getValueAsByteBuffer();
+ }
+ }
+ // buffToRet will not be null when we reach here.
+ return buffToRet;
+ }
+
+ @Override
+ public Checkpoint checkpoint(final Checkpoint checkpoint) throws IOException {
+ Checkpoint lastCheckpoint = checkpointHolder.getLastCheckpoint();
+ // if checkpoint is less than last checkpoint, we don't need to do checkpoint again.
+ if (lastCheckpoint.compareTo(checkpoint) > 0) {
+ return lastCheckpoint;
+ }
+ memTable.flush(this, checkpoint);
+ return super.checkpoint(checkpoint);
+ }
+
+ @Override
+ public void process(long ledgerId, long entryId,
+ ByteBuffer buffer) throws IOException {
+ processEntry(ledgerId, entryId, buffer, false);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ memTable.flush(this, Checkpoint.MAX);
+ super.flush();
+ }
+
+ // CacheCallback functions.
+ @Override
+ public void onSizeLimitReached() throws IOException {
+ // when size limit reached, we get the previous checkpoint from snapshot mem-table.
+ // at this point, we are safer to schedule a checkpoint, since the entries added before
+ // this checkpoint already written to entry logger.
+ // but it would be better not to let mem-table flush to different entry log files,
+ // so we roll entry log files in SortedLedgerStorage itself.
+ // After that, we could make the process writing data to entry logger file not bound with checkpoint.
+ // otherwise, it hurts add performance.
+ scheduler.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ LOG.info("Started flushing mem table.");
+ memTable.flush(SortedLedgerStorage.this);
+ if (entryLogger.reachEntryLogLimit(0)) {
+ entryLogger.rollLog();
+ LOG.info("Rolling entry logger since it reached size limitation");
+ }
+ } catch (IOException e) {
+ // TODO: if we failed to flush data, we should switch the bookie back to readonly mode
+ // or shutdown it.
+ LOG.error("Exception thrown while flushing skip list cache.", e);
+ }
+ }
+ });
+ }
+}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java?rev=1576883&r1=1576882&r2=1576883&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java Wed Mar 12 20:26:58 2014
@@ -21,11 +21,9 @@ import java.io.File;
import java.util.List;
import com.google.common.annotations.Beta;
-
+import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.lang.StringUtils;
-import com.google.common.annotations.Beta;
-
/**
* Configuration manages server-side settings
*/
@@ -95,6 +93,11 @@ public class ServerConfiguration extends
protected final static String READ_BUFFER_SIZE = "readBufferSizeBytes";
protected final static String WRITE_BUFFER_SIZE = "writeBufferSizeBytes";
+ protected final static String SORTED_LEDGER_STORAGE_ENABLED = "sortedLedgerStorageEnabled";
+ protected final static String SKIP_LIST_SIZE_LIMIT = "skipListSizeLimit";
+ protected final static String SKIP_LIST_CHUNK_SIZE_ENTRY = "skipListArenaChunkSize";
+ protected final static String SKIP_LIST_MAX_ALLOC_ENTRY = "skipListArenaMaxAllocSize";
+
/**
* Construct a default configuration object
*/
@@ -732,7 +735,7 @@ public class ServerConfiguration extends
* that ledger.
*
* @see org.apache.bookkeeper.client.BookKeeper#openLedger
- *
+ *
* @param waitTime time to wait before replicating ledger fragment
*/
public void setOpenLedgerRereplicationGracePeriod(String waitTime) {
@@ -743,7 +746,7 @@ public class ServerConfiguration extends
* Get the grace period which the rereplication worker to wait before
* fencing and rereplicating a ledger fragment which is still being written
* to, on bookie failure.
- *
+ *
* @return long
*/
public long getOpenLedgerRereplicationGracePeriod() {
@@ -856,6 +859,76 @@ public class ServerConfiguration extends
return getInt(NUM_JOURNAL_CALLBACK_THREADS, 1);
}
+ /**
+ * Set sorted-ledger storage enabled or not
+ *
+ * @param enabled
+ */
+ public ServerConfiguration setSortedLedgerStorageEnabled(boolean enabled) {
+ this.setProperty(SORTED_LEDGER_STORAGE_ENABLED, enabled);
+ return this;
+ }
+
+ /**
+ * Check if sorted-ledger storage enabled (default true)
+ *
+ * @return
+ */
+ public boolean getSortedLedgerStorageEnabled() {
+ return this.getBoolean(SORTED_LEDGER_STORAGE_ENABLED, true);
+ }
+
+ /**
+ * Get skip list data size limitation (default 64MB)
+ *
+ * @return skip list data size limitation
+ */
+ public long getSkipListSizeLimit() {
+ return this.getLong(SKIP_LIST_SIZE_LIMIT, 64 * 1024 * 1024L);
+ }
+
+ /**
+ * Set skip list size limit.
+ *
+ * @param size skip list size limit.
+ * @return server configuration object.
+ */
+ public ServerConfiguration setSkipListSizeLimit(int size) {
+ setProperty(SKIP_LIST_SIZE_LIMIT, size);
+ return this;
+ }
+
+ /**
+ * Get the number of bytes we should use as chunk allocation for the {@link
+ * org.apache.bookkeeper.bookie.SkipListArena}
+ * Default is 4 MB
+ * @return
+ */
+ public int getSkipListArenaChunkSize() {
+ return getInt(SKIP_LIST_CHUNK_SIZE_ENTRY, 4096 * 1024);
+ }
+
+ /**
+ * Set the number of bytes w used as chunk allocation for {@link
+ * org.apache.bookkeeper.bookie.SkipListArena}.
+ *
+ * @param size chunk size.
+ * @return server configuration object.
+ */
+ public ServerConfiguration setSkipListArenaChunkSize(int size) {
+ setProperty(SKIP_LIST_CHUNK_SIZE_ENTRY, size);
+ return this;
+ }
+
+ /**
+ * Get the max size we should delegate memory allocation to VM for the {@link
+ * org.apache.bookkeeper.bookie.SkipListArena}
+ * Default is 128 KB
+ * @return
+ */
+ public int getSkipListArenaMaxAllocSize() {
+ return getInt(SKIP_LIST_MAX_ALLOC_ENTRY, 128 * 1024);
+ }
/**
* Should we group journal force writes
@@ -940,10 +1013,10 @@ public class ServerConfiguration extends
* Set whether the bookie is able to go into read-only mode.
* If this is set to false, the bookie will shutdown on encountering
* an error condition.
- *
+ *
* @param enabled whether to enable read-only mode.
- *
- * @return ServerConfiguration
+ *
+ * @return ServerConfiguration
*/
public ServerConfiguration setReadOnlyModeEnabled(boolean enabled) {
setProperty(READ_ONLY_MODE_ENABLED, enabled);
@@ -952,7 +1025,7 @@ public class ServerConfiguration extends
/**
* Get whether read-only mode is enabled. The default is false.
- *
+ *
* @return boolean
*/
public boolean isReadOnlyModeEnabled() {
@@ -983,9 +1056,9 @@ public class ServerConfiguration extends
/**
* Set the Disk free space threshold as a fraction of the total
* after which disk will be considered as full during disk check.
- *
+ *
* @param threshold threshold to declare a disk full
- *
+ *
* @return ServerConfiguration
*/
public ServerConfiguration setDiskUsageThreshold(float threshold) {
@@ -995,7 +1068,7 @@ public class ServerConfiguration extends
/**
* Returns disk free space threshold. By default it is 0.95.
- *
+ *
* @return float
*/
public float getDiskUsageThreshold() {
@@ -1004,9 +1077,9 @@ public class ServerConfiguration extends
/**
* Set the disk checker interval to monitor ledger disk space
- *
+ *
* @param interval interval between disk checks for space.
- *
+ *
* @return ServerConfiguration
*/
public ServerConfiguration setDiskCheckInterval(int interval) {
@@ -1016,7 +1089,7 @@ public class ServerConfiguration extends
/**
* Get the disk checker interval
- *
+ *
* @return int
*/
public int getDiskCheckInterval() {
@@ -1140,4 +1213,14 @@ public class ServerConfiguration extends
setProperty(JOURNAL_REMOVE_FROM_PAGE_CACHE, enabled);
return this;
}
+
+ /**
+ * Validate the configuration.
+ * @throws ConfigurationException
+ */
+ public void validate() throws ConfigurationException {
+ if (getSkipListArenaChunkSize() < getSkipListArenaMaxAllocSize()) {
+ throw new ConfigurationException("Arena max allocation size should be smaller than the chunk size.");
+ }
+ }
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java?rev=1576883&r1=1576882&r2=1576883&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java Wed Mar 12 20:26:58 2014
@@ -262,6 +262,7 @@ public class BookieServer {
throws IllegalArgumentException {
try {
conf.loadConf(new File(confFile).toURI().toURL());
+ conf.validate();
} catch (MalformedURLException e) {
LOG.error("Could not open configuration file: " + confFile, e);
throw new IllegalArgumentException();
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/resources/findbugsExclude.xml
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/resources/findbugsExclude.xml?rev=1576883&r1=1576882&r2=1576883&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/resources/findbugsExclude.xml (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/resources/findbugsExclude.xml Wed Mar 12 20:26:58 2014
@@ -20,4 +20,17 @@
<!-- generated code, we can't be held responsible for findbugs in it //-->
<Class name="~org\.apache\.bookkeeper\.proto\.DataFormats.*" />
</Match>
+ <Match>
+ <!-- it is safe to store external bytes reference here. since we are using
+ bytes from a slab. //-->
+ <Class name="org.apache.bookkeeper.bookie.EntryKeyValue" />
+ <Bug pattern="EI_EXPOSE_REP2" />
+ </Match>
+ <Match>
+ <!-- it is safe to store external bytes reference here. since we are using
+ bytes from a slab. //-->
+ <Class name="org.apache.bookkeeper.bookie.EntryKeyValue" />
+ <Method name="getBuffer" />
+ <Bug pattern="EI_EXPOSE_REP" />
+ </Match>
</FindBugsFilter>
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java?rev=1576883&r1=1576882&r2=1576883&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java Wed Mar 12 20:26:58 2014
@@ -102,6 +102,7 @@ public class CompactionTest extends Book
baseConf.setMinorCompactionInterval(minorCompactionInterval);
baseConf.setMajorCompactionInterval(majorCompactionInterval);
baseConf.setEntryLogFilePreAllocationEnabled(false);
+ baseConf.setSortedLedgerStorageEnabled(false);
super.setUp();
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java?rev=1576883&r1=1576882&r2=1576883&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java Wed Mar 12 20:26:58 2014
@@ -211,12 +211,9 @@ public class EntryLogTest extends TestCa
bookie.getLedgerDirsManager().addToFilledDirs(entryLogger.currentDir);
ledgerStorage.addEntry(generateEntry(3, 1));
// Verify written entries
- Assert.assertArrayEquals(generateEntry(1, 1).array(), ledgerStorage
- .getEntry(1, 1).array());
- Assert.assertArrayEquals(generateEntry(2, 1).array(), ledgerStorage
- .getEntry(2, 1).array());
- Assert.assertArrayEquals(generateEntry(3, 1).array(), ledgerStorage
- .getEntry(3, 1).array());
+ Assert.assertTrue(0 == generateEntry(1, 1).compareTo(ledgerStorage.getEntry(1, 1)));
+ Assert.assertTrue(0 == generateEntry(2, 1).compareTo(ledgerStorage.getEntry(2, 1)));
+ Assert.assertTrue(0 == generateEntry(3, 1).compareTo(ledgerStorage.getEntry(3, 1)));
}
@After
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java?rev=1576883&r1=1576882&r2=1576883&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java Wed Mar 12 20:26:58 2014
@@ -220,7 +220,7 @@ public class LedgerCacheTest extends Tes
// flush all first to clean previous dirty ledgers
ledgerCache.flushLedger(true);
- // flush all
+ // flush all
ledgerCache.flushLedger(true);
// delete serveral ledgers
@@ -320,7 +320,8 @@ public class LedgerCacheTest extends Tes
.setJournalDirName(journalDir.getPath())
.setLedgerDirNames(new String[] { ledgerDir.getPath() })
.setFlushInterval(1000)
- .setPageLimit(1);
+ .setPageLimit(1)
+ .setSortedLedgerStorageEnabled(false);
Bookie b = new Bookie(conf);
b.start();
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java?rev=1576883&r1=1576882&r2=1576883&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java Wed Mar 12 20:26:58 2014
@@ -58,6 +58,7 @@ public class LedgerDeleteTest extends Mu
baseConf.setEntryLogSizeLimit(2 * 1024 * 1024L);
baseConf.setGcWaitTime(1000);
baseConf.setEntryLogFilePreAllocationEnabled(false);
+ baseConf.setSortedLedgerStorageEnabled(false);
super.setUp();
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java?rev=1576883&r1=1576882&r2=1576883&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java Wed Mar 12 20:26:58 2014
@@ -39,6 +39,7 @@ public class ReadOnlyBookieTest extends
public ReadOnlyBookieTest() {
super(2);
+ baseConf.setSortedLedgerStorageEnabled(false);
baseConf.setEntryLogFilePreAllocationEnabled(false);
}
|