bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From eolive...@apache.org
Subject [bookkeeper] branch master updated: Add missed tests for SortedLedgerStorage
Date Tue, 04 Jul 2017 06:24:44 GMT
This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new b59d63e  Add missed tests for SortedLedgerStorage
b59d63e is described below

commit b59d63e8be237e80872f73d92a31b4213d9a649b
Author: Sijie Guo <sijie@apache.org>
AuthorDate: Tue Jul 4 08:24:28 2017 +0200

    Add missed tests for SortedLedgerStorage
    
    Added two missed tests for SortedLedgerStorage. It was missed when we (twitter) merged
the SortedLedgerStorage back.
    
    Author: Sijie Guo <sijie@apache.org>
    
    Reviewers: Enrico Olivelli, Jia Zhai
    
    This closes #227 from sijie/more_memory_table_related_tests
---
 .../java/org/apache/bookkeeper/bookie/LogMark.java |   2 +
 .../bookkeeper/bookie/TestEntryMemTable.java       | 263 +++++++++++++++++++++
 .../bookkeeper/bookie/TestSkipListArena.java       | 202 ++++++++++++++++
 3 files changed, 467 insertions(+)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LogMark.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LogMark.java
index 4bf1e05..7aa2850 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LogMark.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LogMark.java
@@ -28,6 +28,8 @@ class LogMark {
     long logFileId;
     long logFileOffset;
 
+    public static final LogMark MAX_VALUE = new LogMark(Long.MAX_VALUE, Long.MAX_VALUE);
+
     public LogMark() {
         setLogMark(0, 0);
     }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java
new file mode 100644
index 0000000..0ca108e
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java
@@ -0,0 +1,263 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.bookie;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.HashSet;
+
+import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.junit.Test;
+import org.junit.Before;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+public class TestEntryMemTable implements CacheCallback, SkipListFlusher, CheckpointSource
{
+
+    private EntryMemTable memTable;
+    private final Random random = new Random();
+    private TestCheckPoint curCheckpoint = new TestCheckPoint(0, 0);
+
+    @Override
+    public Checkpoint newCheckpoint() {
+        return curCheckpoint;
+    }
+
+    @Override
+    public void checkpointComplete(Checkpoint checkpoint, boolean compact)
+            throws IOException {
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        this.memTable = new EntryMemTable(TestBKConfiguration.newServerConfiguration(),
+                this, NullStatsLogger.INSTANCE);
+    }
+
+    @Test
+    public void testLogMark() throws IOException {
+        LogMark mark = new LogMark();
+        assertTrue(mark.compare(new LogMark()) == 0);
+        assertTrue(mark.compare(LogMark.MAX_VALUE) < 0);
+        mark.setLogMark(3, 11);
+        byte[] data = new byte[16];
+        ByteBuffer buf = ByteBuffer.wrap(data);
+        mark.writeLogMark(buf);
+        buf.flip();
+        LogMark mark1 = new LogMark(9, 13);
+        assertTrue(mark1.compare(mark) > 0);
+        mark1.readLogMark(buf);
+        assertTrue(mark1.compare(mark) == 0);
+    }
+
+    /**
+     * Basic put/get
+     * @throws IOException
+     * */
+    @Test
+    public void testBasicOps() throws IOException {
+        long ledgerId = 1;
+        long entryId = 1;
+        byte[] data = new byte[10];
+        random.nextBytes(data);
+        ByteBuffer buf = ByteBuffer.wrap(data);
+        memTable.addEntry(ledgerId, entryId, buf, this);
+        buf.rewind();
+        EntryKeyValue kv = memTable.getEntry(ledgerId, entryId);
+        assertTrue(kv.getLedgerId() == ledgerId);
+        assertTrue(kv.getEntryId() == entryId);
+        assertTrue(kv.getValueAsByteBuffer().nioBuffer().equals(buf));
+        memTable.flush(this);
+    }
+
+    @Override
+    public void onSizeLimitReached() throws IOException {
+        // No-op
+    }
+
+    public void process(long ledgerId, long entryId, ByteBuffer entry)
+            throws IOException {
+        // No-op
+    }
+
+    /**
+     * Test read/write across snapshot
+     * @throws IOException
+     */
+    @Test
+    public void testScanAcrossSnapshot() throws IOException {
+        byte[] data = new byte[10];
+        List<EntryKeyValue> keyValues = new ArrayList<EntryKeyValue>();
+        for (long entryId = 1; entryId < 100; entryId++) {
+            for (long ledgerId = 1; ledgerId < 3; ledgerId++) {
+                random.nextBytes(data);
+                memTable.addEntry(ledgerId, entryId, ByteBuffer.wrap(data), this);
+                keyValues.add(memTable.getEntry(ledgerId, entryId));
+                if (random.nextInt(16) == 0) {
+                    memTable.snapshot();
+                }
+            }
+        }
+
+        for (EntryKeyValue kv : keyValues) {
+            assertTrue(memTable.getEntry(kv.getLedgerId(), kv.getEntryId()).equals(kv));
+        }
+        memTable.flush(this, Checkpoint.MAX);
+    }
+
+    private class KVFLusher implements SkipListFlusher {
+        final HashSet<EntryKeyValue> keyValues;
+
+        KVFLusher(final HashSet<EntryKeyValue> keyValues) {
+            this.keyValues = keyValues;
+        }
+
+        @Override
+        public void process(long ledgerId, long entryId, ByteBuffer entry) throws IOException
{
+            assertTrue(ledgerId + ":" + entryId + " is duplicate in store!",
+                    keyValues.add(new EntryKeyValue(ledgerId, entryId, entry.array())));
+        }
+    }
+
+    private class NoLedgerFLusher implements SkipListFlusher {
+        @Override
+        public void process(long ledgerId, long entryId, ByteBuffer entry) throws IOException
{
+            throw new NoLedgerException(ledgerId);
+        }
+    }
+
+    /**
+     * Test flush w/ logMark parameter
+     * @throws IOException
+     */
+    @Test
+    public void testFlushLogMark() throws IOException {
+        HashSet<EntryKeyValue> flushedKVs = new HashSet<EntryKeyValue>();
+        KVFLusher flusher = new KVFLusher(flushedKVs);
+
+        curCheckpoint.setCheckPoint(2, 2);
+
+        byte[] data = new byte[10];
+        long ledgerId = 100;
+        for (long entryId = 1; entryId < 100; entryId++) {
+            random.nextBytes(data);
+            memTable.addEntry(ledgerId, entryId, ByteBuffer.wrap(data), this);
+        }
+
+        assertNull(memTable.snapshot(new TestCheckPoint(1, 1)));
+        assertNotNull(memTable.snapshot(new TestCheckPoint(3, 3)));
+
+        assertTrue(0 < memTable.flush(flusher));
+        assertTrue(0 == memTable.flush(flusher));
+
+        curCheckpoint.setCheckPoint(4, 4);
+
+        random.nextBytes(data);
+        memTable.addEntry(ledgerId, 101, ByteBuffer.wrap(data), this);
+        assertTrue(0 == memTable.flush(flusher));
+
+        assertTrue(0 == memTable.flush(flusher, new TestCheckPoint(3, 3)));
+        assertTrue(0 < memTable.flush(flusher, new TestCheckPoint(4, 5)));
+    }
+
+    /**
+     * Test snapshot/flush interaction
+     * @throws IOException
+     */
+    @Test
+    public void testFlushSnapshot() throws IOException {
+        HashSet<EntryKeyValue> keyValues = new HashSet<EntryKeyValue>();
+        HashSet<EntryKeyValue> flushedKVs = new HashSet<EntryKeyValue>();
+        KVFLusher flusher = new KVFLusher(flushedKVs);
+
+        byte[] data = new byte[10];
+        for (long entryId = 1; entryId < 100; entryId++) {
+            for (long ledgerId = 1; ledgerId < 100; ledgerId++) {
+                random.nextBytes(data);
+                assertTrue(ledgerId + ":" + entryId + " is duplicate in mem-table!",
+                        memTable.addEntry(ledgerId, entryId, ByteBuffer.wrap(data), this)
!= 0);
+                assertTrue(ledgerId + ":" + entryId + " is duplicate in hash-set!",
+                        keyValues.add(memTable.getEntry(ledgerId, entryId)));
+                if (random.nextInt(16) == 0) {
+                    if (null != memTable.snapshot()) {
+                        if (random.nextInt(2) == 0) {
+                            memTable.flush(flusher);
+                        }
+                    }
+                }
+            }
+        }
+
+        memTable.flush(flusher, Checkpoint.MAX);
+        for (EntryKeyValue kv : keyValues) {
+            assertTrue("kv " + kv.toString() + " was not flushed!", flushedKVs.contains(kv));
+        }
+    }
+
+    /**
+     * Test NoLedger exception/flush interaction
+     * @throws IOException
+     */
+    @Test
+    public void testNoLedgerException() throws IOException {
+        NoLedgerFLusher flusher = new NoLedgerFLusher();
+
+        byte[] data = new byte[10];
+        for (long entryId = 1; entryId < 100; entryId++) {
+            for (long ledgerId = 1; ledgerId < 100; ledgerId++) {
+                random.nextBytes(data);
+                if (random.nextInt(16) == 0) {
+                    if (null != memTable.snapshot()) {
+                        memTable.flush(flusher);
+                    }
+                }
+            }
+        }
+
+        memTable.flush(flusher, Checkpoint.MAX);
+    }
+
+    private static class TestCheckPoint implements Checkpoint {
+
+        LogMark mark;
+
+        public TestCheckPoint(long fid, long fpos) {
+            mark = new LogMark(fid, fpos);
+        }
+
+        private void setCheckPoint(long fid, long fpos) {
+            mark.setLogMark(fid, fpos);
+        }
+
+        @Override
+        public int compareTo(Checkpoint o) {
+            if (Checkpoint.MAX == o) {
+                return -1;
+            }
+            return mark.compare(((TestCheckPoint)o).mark);
+        }
+
+    }
+}
+
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSkipListArena.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSkipListArena.java
new file mode 100644
index 0000000..e42eacc
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSkipListArena.java
@@ -0,0 +1,202 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.bookie;
+
+import static org.junit.Assert.*;
+
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.bookie.SkipListArena.MemorySlice;
+import org.junit.Test;
+
+import java.util.Set;
+import java.util.HashSet;
+import java.util.HashMap;
+import java.util.TreeMap;
+import com.google.common.primitives.Ints;
+
+public class TestSkipListArena {
+
+    class CustomConfiguration extends ServerConfiguration {
+        @Override
+        public int getSkipListArenaChunkSize() {
+            return 4096;
+        }
+        @Override
+        public int getSkipListArenaMaxAllocSize() {
+            return 1024;
+        }
+        @Override
+        public boolean getJournalFlushWhenQueueEmpty() {
+            return true;
+        }
+
+    }
+
+    final CustomConfiguration cfg = new CustomConfiguration();
+
+    /**
+    * Test random allocations
+    */
+    @Test
+    public void testRandomAllocation() {
+        Random rand = new Random();
+        SkipListArena arena = new SkipListArena(cfg);
+        int expectedOff = 0;
+        byte[] lastBuffer = null;
+
+        // 10K iterations by 0-512 alloc -> 2560kB expected
+        // should be reasonable for unit test and also cover wraparound
+        // behavior
+        for (int i = 0; i < 10000; i++) {
+            int size = rand.nextInt(512);
+            MemorySlice alloc = arena.allocateBytes(size);
+
+            if (alloc.getData() != lastBuffer) {
+                expectedOff = 0;
+                lastBuffer = alloc.getData();
+            }
+            assertTrue(expectedOff == alloc.getOffset());
+            assertTrue("Allocation " + alloc + " overruns buffer",
+              alloc.getOffset() + size <= alloc.getData().length);
+            expectedOff += size;
+        }
+    }
+
+    @Test
+    public void testLargeAllocation() {
+        SkipListArena arena = new SkipListArena(cfg);
+        MemorySlice alloc = arena.allocateBytes(1024 + 1024);
+        assertNull("2KB allocation shouldn't be satisfied by LAB.", alloc);
+    }
+
+    private class ByteArray {
+        final byte[] bytes;
+        ByteArray(final byte[] bytes) {
+            this.bytes = bytes;
+        }
+
+        @Override
+        public int hashCode() {
+            return bytes.hashCode();
+        }
+
+        @Override
+        public boolean equals(Object object) {
+            if (object instanceof ByteArray) {
+                ByteArray other = (ByteArray)object;
+                return this.bytes.equals(other.bytes);
+            }
+            return false;
+        }
+    }
+
+    private static class AllocBuffer implements Comparable<AllocBuffer>{
+        private final MemorySlice alloc;
+        private final int size;
+        public AllocBuffer(MemorySlice alloc, int size) {
+            super();
+            this.alloc = alloc;
+            this.size = size;
+        }
+
+        @Override
+        public int compareTo(AllocBuffer e) {
+            assertTrue(alloc.getData() == e.alloc.getData());
+            return Ints.compare(alloc.getOffset(), e.alloc.getOffset());
+        }
+
+        @Override
+        public String toString() {
+          return alloc + ":" + size;
+        }
+    }
+
+    private Thread getAllocThread(final ConcurrentLinkedQueue<AllocBuffer> queue,
+                                  final CountDownLatch latch,
+                                  final SkipListArena arena) {
+        return new Thread(new Runnable() {
+            @Override
+            public void run() {
+                Random rand = new Random();
+                for (int j = 0; j < 1000; j++) {
+                    int size = rand.nextInt(512);
+                    MemorySlice alloc = arena.allocateBytes(size);
+                    queue.add(new AllocBuffer(alloc, size));
+                }
+                latch.countDown();
+            }
+        });
+    }
+
+    /**
+    * Test concurrent allocation, check the results don't overlap
+    */
+    @Test
+    public void testConcurrency() throws Exception {
+        final SkipListArena arena = new SkipListArena(cfg);
+        final CountDownLatch latch = new CountDownLatch(10);
+        final ConcurrentLinkedQueue<AllocBuffer> queue = new ConcurrentLinkedQueue<AllocBuffer>();
+
+        Set<Thread> testThreads = new HashSet<Thread>();
+        for (int i = 0; i < 10; i++) {
+            testThreads.add(getAllocThread(queue, latch, arena));
+        }
+
+        for (Thread thread : testThreads) {
+            thread.start();
+        }
+        latch.await();
+
+        // Partition the allocations by the actual byte[] they share,
+        // make sure offsets are unique and non-overlap for each buffer.
+        Map<ByteArray, Map<Integer, AllocBuffer>> mapsByArray = new HashMap<ByteArray,
Map<Integer, AllocBuffer>>();
+        boolean overlapped = false;
+
+        final AllocBuffer[] buffers = queue.toArray(new AllocBuffer[0]);
+        for (AllocBuffer buf : buffers) {
+            if (buf.size != 0) {
+                ByteArray ptr = new ByteArray(buf.alloc.getData());
+                Map<Integer, AllocBuffer> tree_map = mapsByArray.get(ptr);
+                if (tree_map == null) {
+                    tree_map = new TreeMap<Integer, AllocBuffer>();
+                    mapsByArray.put(ptr, tree_map);
+                }
+                AllocBuffer other = tree_map.put(new Integer(buf.alloc.getOffset()), buf);
+                if (other != null) {
+                    fail("Buffer " + other.toString() + " overlapped with " + buf.toString());
+                }
+            }
+        }
+
+        // Now check each byte array to make sure allocations don't overlap
+        for (Map<Integer, AllocBuffer> tree_map : mapsByArray.values()) {
+            int expectedOff = 0;
+            for (AllocBuffer buf : tree_map.values()) {
+                assertEquals(expectedOff, buf.alloc.getOffset());
+                expectedOff += buf.size;
+            }
+        }
+    }
+}
+

-- 
To stop receiving notification emails like this one, please contact
['"commits@bookkeeper.apache.org" <commits@bookkeeper.apache.org>'].

Mime
View raw message