jackrabbit-oak-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ju...@apache.org
Subject svn commit: r1525946 - in /jackrabbit/oak/trunk: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/m...
Date Tue, 24 Sep 2013 17:10:06 GMT
Author: jukka
Date: Tue Sep 24 17:10:05 2013
New Revision: 1525946

URL: http://svn.apache.org/r1525946
Log:
OAK-593: Segment-based MK

Use the same segment cache across file and MongoDB backends

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCache.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/mongo/MongoStore.java
    jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/BenchmarkRunner.java
    jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/OakRepositoryFixture.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCache.java?rev=1525946&r1=1525945&r2=1525946&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCache.java
(original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCache.java
Tue Sep 24 17:10:05 2013
@@ -16,77 +16,101 @@
  */
 package org.apache.jackrabbit.oak.plugins.segment;
 
+import static com.google.common.base.Preconditions.checkState;
+
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import org.apache.jackrabbit.oak.cache.CacheStats;
 
 /**
- * Combined memory and disk cache for segments.
+ * Memory cache for segments.
  */
 public class SegmentCache {
 
-    private static final long DEFAULT_MEMORY_CACHE_SIZE = 1 << 28; // 256MB
+    private static final int DEFAULT_MEMORY_CACHE_SIZE = 1 << 28; // 256MB
 
-    private final Cache<UUID, Segment> memoryCache;
+    private final int maximumSize;
 
-    private final CacheStats cacheStats;
+    private int currentSize = 0;
 
-    // private final Cache<UUID, File> diskCache;
+    private final Map<UUID, Segment> segments =
+        new LinkedHashMap<UUID, Segment>(1000, 0.75f, true) {
+            @Override
+            protected boolean removeEldestEntry(Entry<UUID, Segment> eldest) {
+                if (currentSize > maximumSize) {
+                    currentSize -= eldest.getValue().size();
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+        };
 
-    // private final File diskCacheDirectory;
-
-    public SegmentCache(long memoryCacheSize) {
-//        this.diskCacheDirectory = diskCacheDirectory;
-//        this.diskCache = CacheBuilder.newBuilder()
-//                .maximumWeight(diskCacheSize)
-//                .weigher(new Weigher<UUID, File>() {
-//                    @Override
-//                    public int weigh(UUID key, File value) {
-//                        return (int) value.length(); // <= max segment size
-//                    }
-//                }).build();
-        this.memoryCache = CacheBuilder.newBuilder()
-                .maximumWeight(memoryCacheSize)
-                .recordStats()
-                .weigher(Segment.WEIGHER)
-//                .removalListener(new RemovalListener<UUID, Segment>() {
-//                    @Override
-//                    public void onRemoval(
-//                            RemovalNotification<UUID, Segment> notification) {
-//                        notification.getValue();
-//                    }
-//                })
-                .build();
+    private final Set<UUID> currentlyLoading = new HashSet<UUID>();
 
-        cacheStats = new CacheStats(memoryCache, "Segment", Segment.WEIGHER, memoryCacheSize);
+    public SegmentCache(int maximumSize) {
+        this.maximumSize = maximumSize;
     }
 
     public SegmentCache() {
         this(DEFAULT_MEMORY_CACHE_SIZE);
     }
 
-    public Segment getSegment(UUID segmentId, Callable<Segment> loader) {
-        try {
-            return memoryCache.get(segmentId, loader);
-        } catch (ExecutionException e) {
-            throw new IllegalStateException(
-                    "Failed to load segment " + segmentId, e);
+    public Segment getSegment(UUID segmentId, Callable<Segment> loader)
+            throws Exception {
+        synchronized (this) {
+            Segment segment = segments.get(segmentId);
+
+            while (segment == null && currentlyLoading.contains(segmentId)) {
+                wait();
+                segment = segments.get(segmentId);
+            }
+
+            if (segment != null) {
+                return segment;
+            } else {
+                currentlyLoading.add(segmentId);
+            }
+        }
+
+        Segment segment = loader.call();
+        synchronized (this) {
+            segments.put(segmentId, segment);
+            currentSize += segment.size();
+            currentlyLoading.remove(segmentId);
+            notifyAll();
         }
+        return segment;
     }
 
-    public void addSegment(Segment segment) {
-        memoryCache.put(segment.getSegmentId(), segment);
+    public synchronized void addSegment(Segment segment) {
+        checkState(!segments.containsKey(segment.getSegmentId()));
+        checkState(!currentlyLoading.contains(segment.getSegmentId()));
+        segments.put(segment.getSegmentId(), segment);
+        currentSize += segment.size();
     }
 
-    public void removeSegment(UUID segmentId) {
-        memoryCache.invalidate(segmentId);
+    public synchronized void removeSegment(UUID segmentId)
+            throws InterruptedException {
+        while (currentlyLoading.contains(segmentId)) {
+            wait();
+        }
+        Segment segment = segments.remove(segmentId);
+        if (segment != null) {
+            currentSize -= segment.size();
+        }
     }
 
-    public CacheStats getCacheStats() {
-        return cacheStats;
+    public synchronized void clear() throws InterruptedException {
+        while (!currentlyLoading.isEmpty()) {
+            wait();
+        }
+        segments.clear();
+        currentSize = 0;
     }
+
 }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java?rev=1525946&r1=1525945&r2=1525946&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
(original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
Tue Sep 24 17:10:05 2013
@@ -16,8 +16,6 @@
  */
 package org.apache.jackrabbit.oak.plugins.segment;
 
-import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean;
-
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
@@ -35,7 +33,6 @@ import org.apache.felix.scr.annotations.
 import org.apache.felix.scr.annotations.Service;
 import org.apache.jackrabbit.oak.api.Blob;
 import org.apache.jackrabbit.oak.api.CommitFailedException;
-import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean;
 import org.apache.jackrabbit.oak.plugins.segment.file.FileStore;
 import org.apache.jackrabbit.oak.plugins.segment.mongo.MongoStore;
 import org.apache.jackrabbit.oak.spi.commit.CommitHook;
@@ -43,8 +40,6 @@ import org.apache.jackrabbit.oak.spi.com
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
-import org.apache.jackrabbit.oak.spi.whiteboard.OsgiWhiteboard;
-import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
 import org.osgi.service.component.ComponentContext;
 
 @Component(policy = ConfigurationPolicy.REQUIRE)
@@ -75,7 +70,7 @@ public class SegmentNodeStoreService imp
     @Property(description="Cache size (MB)", intValue=200)
     public static final String CACHE = "cache";
 
-    private static final long MB = 1024 * 1024;
+    private static final int MB = 1024 * 1024;
 
     private String name;
 
@@ -85,8 +80,6 @@ public class SegmentNodeStoreService imp
 
     private NodeStore delegate;
 
-    private Registration cacheStatsReg;
-
     private synchronized NodeStore getDelegate() {
         assert delegate != null : "service must be activated when used";
         return delegate;
@@ -128,9 +121,6 @@ public class SegmentNodeStoreService imp
             mongo = new Mongo(host, port);
             SegmentCache sc = new SegmentCache(cache * MB);
             store = new MongoStore(mongo.getDB(db), sc);
-
-            cacheStatsReg = registerMBean(new OsgiWhiteboard(context.getBundleContext()),
CacheStatsMBean.class,
-                    sc.getCacheStats(), CacheStatsMBean.TYPE, sc.getCacheStats().getName());
         }
 
         delegate = new SegmentNodeStore(store);
@@ -150,10 +140,6 @@ public class SegmentNodeStoreService imp
     public synchronized void deactivate() {
         delegate = null;
 
-        if(cacheStatsReg != null){
-            cacheStatsReg.unregister();
-        }
-
         store.close();
         if (mongo != null) {
             mongo.close();

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java?rev=1525946&r1=1525945&r2=1525946&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
(original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
Tue Sep 24 17:10:05 2013
@@ -31,21 +31,18 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.Map;
-import java.util.UUID;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
+import java.util.UUID;
 
 import org.apache.jackrabbit.oak.plugins.segment.Journal;
 import org.apache.jackrabbit.oak.plugins.segment.RecordId;
 import org.apache.jackrabbit.oak.plugins.segment.Segment;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentCache;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
 import org.apache.jackrabbit.oak.plugins.segment.Template;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-
 public class FileStore implements SegmentStore {
 
     private static final long SEGMENT_MAGIC = 0x4f616b0a527845ddL;
@@ -66,8 +63,7 @@ public class FileStore implements Segmen
 
     private final Map<String, Journal> journals = newHashMap();
 
-    private final Cache<UUID, Segment> segments =
-            CacheBuilder.newBuilder().maximumSize(1000).build();
+    private final SegmentCache cache = new SegmentCache();
 
     public FileStore(File directory, int maxFileSize, boolean memoryMapping)
             throws IOException {
@@ -112,17 +108,16 @@ public class FileStore implements Segmen
     }
 
     public synchronized void close() {
-        for (TarFile file : files) {
-            try {
+        try {
+            for (TarFile file : files) {
                 file.close();
-            } catch (IOException e) {
-                throw new RuntimeException(e);
             }
+            files.clear();
+            cache.clear();
+            System.gc(); // for any memory-mappings that are no longer used
+        } catch (Exception e) {
+            throw new RuntimeException(e);
         }
-        files.clear();
-        segments.invalidateAll();
-        segments.cleanUp();
-        System.gc(); // for any memory-mappings that are no longer used
     }
 
     @Override
@@ -138,41 +133,47 @@ public class FileStore implements Segmen
     @Override
     public Segment readSegment(final UUID id) {
         try {
-            return segments.get(id, new Callable<Segment>() {
+            return cache.getSegment(id, new Callable<Segment>() {
                 @Override
                 public Segment call() throws Exception {
-                    for (TarFile file : files) {
-                        ByteBuffer buffer = file.readEntry(id);
-                        if (buffer != null) {
-                            checkState(SEGMENT_MAGIC == buffer.getLong());
-                            int length = buffer.getInt();
-                            int count = buffer.getInt();
-
-                            checkState(id.equals(new UUID(
-                                    buffer.getLong(), buffer.getLong())));
-
-                            Collection<UUID> referencedIds =
-                                    newArrayListWithCapacity(count);
-                            for (int i = 0; i < count; i++) {
-                                referencedIds.add(new UUID(
-                                        buffer.getLong(), buffer.getLong()));
-                            }
-
-                            buffer.limit(buffer.position() + length);
-                            return new Segment(
-                                    FileStore.this, id,
-                                    buffer.slice(), referencedIds,
-                                    Collections.<String, RecordId>emptyMap(),
-                                    Collections.<Template, RecordId>emptyMap());
-                        }
-                    }
-                    throw new IllegalStateException(
-                            "Segment " + id + " not found");
+                    return loadSegment(id);
                 }
             });
-        } catch (ExecutionException e) {
-            throw new RuntimeException("Failed to load segment " + id, e);
+        } catch (IllegalStateException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private Segment loadSegment(UUID id) throws Exception {
+        for (TarFile file : files) {
+            ByteBuffer buffer = file.readEntry(id);
+            if (buffer != null) {
+                checkState(SEGMENT_MAGIC == buffer.getLong());
+                int length = buffer.getInt();
+                int count = buffer.getInt();
+
+                checkState(id.equals(new UUID(
+                        buffer.getLong(), buffer.getLong())));
+
+                Collection<UUID> referencedIds =
+                        newArrayListWithCapacity(count);
+                for (int i = 0; i < count; i++) {
+                    referencedIds.add(new UUID(
+                            buffer.getLong(), buffer.getLong()));
+                }
+
+                buffer.limit(buffer.position() + length);
+                return new Segment(
+                        FileStore.this, id,
+                        buffer.slice(), referencedIds,
+                        Collections.<String, RecordId>emptyMap(),
+                        Collections.<Template, RecordId>emptyMap());
+            }
         }
+
+        throw new IllegalStateException("Segment " + id + " not found");
     }
 
     @Override
@@ -203,7 +204,8 @@ public class FileStore implements Segmen
         }
 
         buffer.position(pos);
-        segments.put(segmentId, new Segment(
+
+        cache.addSegment(new Segment(
                 this, segmentId, buffer.slice(),
                 referencedSegmentIds, strings, templates));
     }
@@ -223,6 +225,11 @@ public class FileStore implements Segmen
     @Override
     public void deleteSegment(UUID segmentId) {
         // TODO: implement
+        try {
+            cache.removeSegment(segmentId);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     synchronized void writeJournals() throws IOException {

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/mongo/MongoStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/mongo/MongoStore.java?rev=1525946&r1=1525945&r2=1525946&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/mongo/MongoStore.java
(original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/mongo/MongoStore.java
Tue Sep 24 17:10:05 2013
@@ -70,12 +70,12 @@ public class MongoStore implements Segme
                 concern, builder.getNodeState()));
     }
 
-    public MongoStore(DB db, long cacheSize) {
+    public MongoStore(DB db, int cacheSize) {
         this(db, new SegmentCache(cacheSize));
     }
 
 
-    public MongoStore(Mongo mongo, long cacheSize) {
+    public MongoStore(Mongo mongo, int cacheSize) {
         this(mongo.getDB("Oak"), cacheSize);
     }
 
@@ -96,12 +96,16 @@ public class MongoStore implements Segme
 
     @Override
     public Segment readSegment(final UUID segmentId) {
-        return cache.getSegment(segmentId, new Callable<Segment>() {
-            @Override
-            public Segment call() throws Exception {
-                return findSegment(segmentId);
-            }
-        });
+        try {
+            return cache.getSegment(segmentId, new Callable<Segment>() {
+                @Override
+                public Segment call() throws Exception {
+                    return findSegment(segmentId);
+                }
+            });
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
     }
 
     @Override
@@ -162,6 +166,11 @@ public class MongoStore implements Segme
     @Override
     public void deleteSegment(UUID segmentId) {
         segments.remove(new BasicDBObject("_id", segmentId.toString()));
-        cache.removeSegment(segmentId);
+        try {
+            cache.removeSegment(segmentId);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
     }
+
 }

Modified: jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/BenchmarkRunner.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/BenchmarkRunner.java?rev=1525946&r1=1525945&r2=1525946&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/BenchmarkRunner.java
(original)
+++ jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/BenchmarkRunner.java
Tue Sep 24 17:10:05 2013
@@ -32,7 +32,7 @@ import org.apache.jackrabbit.oak.fixture
 
 public class BenchmarkRunner {
 
-    private static final long MB = 1024 * 1024;
+    private static final int MB = 1024 * 1024;
 
     public static void main(String[] args) throws Exception {
         OptionParser parser = new OptionParser();

Modified: jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/OakRepositoryFixture.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/OakRepositoryFixture.java?rev=1525946&r1=1525945&r2=1525946&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/OakRepositoryFixture.java
(original)
+++ jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/OakRepositoryFixture.java
Tue Sep 24 17:10:05 2013
@@ -115,7 +115,7 @@ public abstract class OakRepositoryFixtu
     }
 
     public static RepositoryFixture getSegment(
-            final String host, final int port, final long cacheSize) {
+            final String host, final int port, final int cacheSize) {
         return new OakRepositoryFixture("Oak-Segment") {
             private SegmentStore[] stores;
             private Mongo mongo;



Mime
View raw message