jackrabbit-oak-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ju...@apache.org
Subject svn commit: r1575372 - in /jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment: AbstractStore.java file/FileStore.java
Date Fri, 07 Mar 2014 18:56:50 GMT
Author: jukka
Date: Fri Mar  7 18:56:50 2014
New Revision: 1575372

URL: http://svn.apache.org/r1575372
Log:
OAK-593: Segment-based MK

Inline AbstractStore into FileStore to make the caching code easier to push down to RandomAccess

Removed:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/AbstractStore.java
Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java?rev=1575372&r1=1575371&r2=1575372&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
(original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
Fri Mar  7 18:56:50 2014
@@ -20,6 +20,7 @@ import static com.google.common.base.Pre
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.collect.Lists.newArrayList;
 import static com.google.common.collect.Lists.newCopyOnWriteArrayList;
+import static com.google.common.collect.Sets.newHashSet;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
 import static org.apache.jackrabbit.oak.plugins.segment.SegmentIdFactory.isBulkSegmentId;
@@ -29,32 +30,55 @@ import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
 
-import javax.annotation.Nonnull;
-
 import org.apache.jackrabbit.oak.api.Blob;
-import org.apache.jackrabbit.oak.plugins.segment.AbstractStore;
+import org.apache.jackrabbit.oak.cache.CacheLIRS;
 import org.apache.jackrabbit.oak.plugins.segment.RecordId;
 import org.apache.jackrabbit.oak.plugins.segment.Segment;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentIdFactory;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentWriter;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class FileStore extends AbstractStore {
+import com.google.common.cache.Cache;
+
+public class FileStore implements SegmentStore {
 
     private static final Logger log = LoggerFactory.getLogger(FileStore.class);
 
+    private static final int MB = 1024 * 1024;
+
     private static final int DEFAULT_MEMORY_CACHE_SIZE = 256;
 
     private static final String FILE_NAME_FORMAT = "%s%05d.tar";
 
     private static final String JOURNAL_FILE_NAME = "journal.log";
 
+    private final SegmentIdFactory factory = new SegmentIdFactory();
+
+    private final SegmentWriter writer = new SegmentWriter(this, factory);
+
+    protected final Cache<UUID, Segment> segments;
+
+    /**
+     * Identifiers of the segments that are currently being loaded.
+     */
+    private final Set<UUID> currentlyLoading = newHashSet();
+
+    /**
+     * Number of threads that are currently waiting for segments to be loaded.
+     * Used to avoid extra {@link #notifyAll()} calls when nobody is waiting.
+     */
+    private int currentlyWaiting = 0;
+
     private final File directory;
 
     private final int maxFileSize;
@@ -103,12 +127,21 @@ public class FileStore extends AbstractS
     public FileStore(
             final File directory, NodeState initial, int maxFileSizeMB,
             int cacheSizeMB, boolean memoryMapping) throws IOException {
-        super(cacheSizeMB);
         checkNotNull(directory).mkdirs();
         this.directory = directory;
         this.maxFileSize = maxFileSizeMB * MB;
         this.memoryMapping = memoryMapping;
 
+        if (memoryMapping) {
+            // let the OS take care of caching
+            this.segments = null;
+        } else {
+            this.segments = CacheLIRS.newBuilder()
+                    .weigher(Segment.WEIGHER)
+                    .maximumWeight(cacheSizeMB * MB)
+                    .build();
+        }
+
         for (int i = 0; true; i++) {
             String name = String.format(FILE_NAME_FORMAT, "bulk", i);
             File file = new File(directory, name);
@@ -218,6 +251,23 @@ public class FileStore extends AbstractS
     }
 
     @Override
+    public SegmentWriter getWriter() {
+        return writer;
+    }
+
+    @Override
+    public SegmentNodeState getHead() {
+        return new SegmentNodeState(getWriter().getDummySegment(), head.get());
+    }
+
+    @Override
+    public boolean setHead(SegmentNodeState base, SegmentNodeState head) {
+        RecordId id = this.head.get();
+        return id.equals(base.getRecordId())
+                && this.head.compareAndSet(id, head.getRecordId());
+    }
+
+    @Override
     public void close() {
         try {
             // avoid deadlocks while joining the flush thread
@@ -230,7 +280,18 @@ public class FileStore extends AbstractS
             }
 
             synchronized (this) {
-                super.close();
+                if (segments != null) {
+                    synchronized (segments) {
+                        while (!currentlyLoading.isEmpty()) {
+                            try {
+                                segments.wait(); // for concurrent loads to finish
+                            } catch (InterruptedException e) {
+                                throw new RuntimeException("Interrupted", e);
+                            }
+                        }
+                        segments.invalidateAll();
+                    }
+                }
 
                 flush();
 
@@ -254,29 +315,67 @@ public class FileStore extends AbstractS
     }
 
     @Override
-    public SegmentNodeState getHead() {
-        return new SegmentNodeState(getWriter().getDummySegment(), head.get());
-    }
+    public Segment readSegment(UUID id) {
+        if (isBulkSegmentId(id)) {
+            return loadSegment(id, bulkFiles);
+        }
 
-    @Override
-    public boolean setHead(SegmentNodeState base, SegmentNodeState head) {
-        RecordId id = this.head.get();
-        return id.equals(base.getRecordId())
-                && this.head.compareAndSet(id, head.getRecordId());
-    }
+        Segment segment = getWriter().getCurrentSegment(id);
+        if (segment != null) {
+            return segment;
+        }
 
-    @Override @Nonnull
-    protected Segment loadSegment(UUID id) {
-        List<TarFile> files = dataFiles;
-        if (isBulkSegmentId(id)) {
-            files = bulkFiles;
+        if (segments != null) {
+            // no in-memory cache, load the segment directly
+            return loadSegment(id, dataFiles);
         }
 
+        synchronized (segments) {
+            // check if the segment is already cached
+            segment = segments.getIfPresent(id);
+            // ... or currently being loaded
+            while (segment == null && currentlyLoading.contains(id)) {
+                currentlyWaiting++;
+                try {
+                    segments.wait(); // for another thread to load the segment
+                } catch (InterruptedException e) {
+                    throw new RuntimeException("Interrupted", e);
+                } finally {
+                    currentlyWaiting--;
+                }
+                segment = segments.getIfPresent(id);
+            }
+            if (segment != null) {
+                // found the segment in the cache
+                return segment;
+            }
+            // not yet cached, so start let others know that we're loading it
+            currentlyLoading.add(id);
+        }
+
+        try {
+            segment = loadSegment(id, dataFiles);
+        } finally {
+            synchronized (segments) {
+                if (segment != null) {
+                    segments.put(id, segment);
+                }
+                currentlyLoading.remove(id);
+                if (currentlyWaiting > 0) {
+                    segments.notifyAll();
+                }
+            }
+        }
+
+        return segment;
+    }
+
+    protected Segment loadSegment(UUID id, List<TarFile> files) {
         for (TarFile file : files) {
             try {
                 ByteBuffer buffer = file.readEntry(id);
                 if (buffer != null) {
-                    return createSegment(id, buffer);
+                    return new Segment(this, factory, id, buffer);
                 }
             } catch (IOException e) {
                 throw new RuntimeException(
@@ -317,4 +416,4 @@ public class FileStore extends AbstractS
         return new FileBlob(reference); // FIXME: proper reference lookup
     }
 
-}
+}
\ No newline at end of file



Mime
View raw message