jackrabbit-oak-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ju...@apache.org
Subject svn commit: r1575352 - in /jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment: Segment.java http/HttpStore.java
Date Fri, 07 Mar 2014 18:17:15 GMT
Author: jukka
Date: Fri Mar  7 18:17:15 2014
New Revision: 1575352

URL: http://svn.apache.org/r1575352
Log:
OAK-593: Segment-based MK

Implement HttpStore without AbstractStore to make the caching code easier to customize

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/http/HttpStore.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java?rev=1575352&r1=1575351&r2=1575352&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
(original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
Fri Mar  7 18:17:15 2014
@@ -97,7 +97,7 @@ public class Segment {
      */
     static final int MEDIUM_LIMIT = (1 << (16 - 2)) + SMALL_LIMIT;
 
-    static final Weigher<UUID, Segment> WEIGHER =
+    public static final Weigher<UUID, Segment> WEIGHER =
             new Weigher<UUID, Segment>() {
                 @Override
                 public int weigh(UUID key, Segment value) {

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/http/HttpStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/http/HttpStore.java?rev=1575352&r1=1575351&r2=1575352&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/http/HttpStore.java
(original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/http/HttpStore.java
Fri Mar  7 18:17:15 2014
@@ -17,6 +17,8 @@
 package org.apache.jackrabbit.oak.plugins.segment.http;
 
 import static com.google.common.base.Charsets.UTF_8;
+import static com.google.common.collect.Sets.newHashSet;
+import static org.apache.jackrabbit.oak.plugins.segment.SegmentIdFactory.isBulkSegmentId;
 
 import java.io.BufferedReader;
 import java.io.IOException;
@@ -27,24 +29,46 @@ import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLConnection;
 import java.nio.ByteBuffer;
+import java.util.Set;
 import java.util.UUID;
 
 import javax.annotation.CheckForNull;
 
-import org.apache.jackrabbit.oak.plugins.segment.AbstractStore;
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.cache.CacheLIRS;
 import org.apache.jackrabbit.oak.plugins.segment.RecordId;
 import org.apache.jackrabbit.oak.plugins.segment.Segment;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentIdFactory;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentWriter;
 
+import com.google.common.cache.Cache;
 import com.google.common.io.ByteStreams;
 
-public class HttpStore extends AbstractStore {
+public class HttpStore implements SegmentStore {
+
+    protected static final int MB = 1024 * 1024;
 
     private final SegmentIdFactory factory = new SegmentIdFactory();
 
+    private final SegmentWriter writer = new SegmentWriter(this, factory);
+
     private final URL base;
 
+    private final Cache<UUID, Segment> segments;
+
+    /**
+     * Identifiers of the segments that are currently being loaded.
+     */
+    private final Set<UUID> currentlyLoading = newHashSet();
+
+    /**
+     * Number of threads that are currently waiting for segments to be loaded.
+     * Used to avoid extra {@link #notifyAll()} calls when nobody is waiting.
+     */
+    private int currentlyWaiting = 0;
+
     /**
      * @param base
      *            make sure the url ends with a slash "/", otherwise the
@@ -52,13 +76,16 @@ public class HttpStore extends AbstractS
      * @param cacheSizeMB
      */
     public HttpStore(URL base, int cacheSizeMB) {
-        super(cacheSizeMB);
         this.base = base;
+        this.segments = CacheLIRS.newBuilder()
+                .weigher(Segment.WEIGHER)
+                .maximumWeight(cacheSizeMB * MB)
+                .build();
     }
 
-    protected URLConnection get(String fragment) throws MalformedURLException,
-            IOException {
-        return new URL(base, fragment).openConnection();
+    @Override
+    public SegmentWriter getWriter() {
+        return writer;
     }
 
     @Override
@@ -91,14 +118,64 @@ public class HttpStore extends AbstractS
     }
 
     @Override
-    @CheckForNull
-    protected Segment loadSegment(UUID id) {
+    public Segment readSegment(UUID id) {
+        if (isBulkSegmentId(id)) {
+            return loadSegment(id);
+        }
+
+        Segment segment = getWriter().getCurrentSegment(id);
+        if (segment != null) {
+            return segment;
+        }
+
+        synchronized (segments) {
+            // check if the segment is already cached
+            segment = segments.getIfPresent(id);
+            // ... or currently being loaded
+            while (segment == null && currentlyLoading.contains(id)) {
+                currentlyWaiting++;
+                try {
+                    segments.wait(); // for another thread to load the segment
+                } catch (InterruptedException e) {
+                    throw new RuntimeException("Interrupted", e);
+                } finally {
+                    currentlyWaiting--;
+                }
+                segment = segments.getIfPresent(id);
+            }
+            if (segment != null) {
+                // found the segment in the cache
+                return segment;
+            }
+            // not yet cached, so start let others know that we're loading it
+            currentlyLoading.add(id);
+        }
+
+        try {
+            segment = loadSegment(id);
+        } finally {
+            synchronized (segments) {
+                if (segment != null) {
+                    segments.put(id, segment);
+                }
+                currentlyLoading.remove(id);
+                if (currentlyWaiting > 0) {
+                    segments.notifyAll();
+                }
+            }
+        }
+
+        return segment;
+    }
+
+    private Segment loadSegment(UUID uuid) {
         try {
-            final URLConnection connection = get(id.toString());
+            URLConnection connection =
+                    new URL(base, uuid.toString()).openConnection();
             InputStream stream = connection.getInputStream();
             try {
                 byte[] data = ByteStreams.toByteArray(stream);
-                return new Segment(this, factory, id, ByteBuffer.wrap(data));
+                return new Segment(this, factory, uuid, ByteBuffer.wrap(data));
             } finally {
                 stream.close();
             }
@@ -113,7 +190,8 @@ public class HttpStore extends AbstractS
     public void writeSegment(UUID segmentId, byte[] bytes, int offset,
             int length) {
         try {
-            URLConnection connection = get(segmentId.toString());
+            URLConnection connection =
+                    new URL(base, segmentId.toString()).openConnection();
             connection.setDoInput(false);
             connection.setDoOutput(true);
             OutputStream stream = connection.getOutputStream();
@@ -129,4 +207,23 @@ public class HttpStore extends AbstractS
         }
     }
 
+    @Override
+    public void close() {
+        synchronized (segments) {
+            while (!currentlyLoading.isEmpty()) {
+                try {
+                    segments.wait(); // for concurrent loads to finish
+                } catch (InterruptedException e) {
+                    throw new RuntimeException("Interrupted", e);
+                }
+            }
+            segments.invalidateAll();
+        }
+    }
+
+    @Override @CheckForNull
+    public Blob readBlob(String reference) {
+        return null;
+    }
+
 }



Mime
View raw message