Return-Path: X-Original-To: apmail-jackrabbit-oak-commits-archive@minotaur.apache.org Delivered-To: apmail-jackrabbit-oak-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 37B1610CC7 for ; Fri, 6 Dec 2013 01:22:49 +0000 (UTC) Received: (qmail 52088 invoked by uid 500); 6 Dec 2013 01:22:49 -0000 Delivered-To: apmail-jackrabbit-oak-commits-archive@jackrabbit.apache.org Received: (qmail 52046 invoked by uid 500); 6 Dec 2013 01:22:49 -0000 Mailing-List: contact oak-commits-help@jackrabbit.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: oak-dev@jackrabbit.apache.org Delivered-To: mailing list oak-commits@jackrabbit.apache.org Received: (qmail 52037 invoked by uid 99); 6 Dec 2013 01:22:49 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 06 Dec 2013 01:22:49 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 06 Dec 2013 01:22:47 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 48B09238889B; Fri, 6 Dec 2013 01:22:27 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1548354 - in /jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment: SegmentNodeStore.java file/FileStore.java Date: Fri, 06 Dec 2013 01:22:27 -0000 To: oak-commits@jackrabbit.apache.org From: jukka@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20131206012227.48B09238889B@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: jukka Date: Fri Dec 6 01:22:26 2013 New Revision: 1548354 URL: http://svn.apache.org/r1548354 Log: OAK-593: Segment-based MK Use AtomicReferences to track the latest head state Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java?rev=1548354&r1=1548353&r2=1548354&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java (original) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java Fri Dec 6 01:22:26 2013 @@ -30,6 +30,7 @@ import java.io.InputStream; import java.util.Random; import java.util.UUID; import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicReference; import javax.annotation.CheckForNull; import javax.annotation.Nonnull; @@ -61,7 +62,7 @@ public class SegmentNodeStore implements /** * Local copy of the head of the journal associated with this store. */ - private volatile SegmentNodeState head; + private final AtomicReference head; /** * Semaphore that controls access to the {@link #head} variable. @@ -75,8 +76,8 @@ public class SegmentNodeStore implements public SegmentNodeStore(SegmentStore store, String journal) { this.store = store; this.journal = store.getJournal(journal); - this.head = new SegmentNodeState( - store.getWriter().getDummySegment(), this.journal.getHead()); + this.head = new AtomicReference(new SegmentNodeState( + store.getWriter().getDummySegment(), this.journal.getHead())); this.changeDispatcher = new ChangeDispatcher(getRoot()); } @@ -93,15 +94,16 @@ public class SegmentNodeStore implements } /** - * Refreshes the head state. Does nothing if a concurrent local commit is - * in progress, as that commit will automatically refresh the head state. + * Refreshes the head state. Should only be called while holding a + * permit from the {@link #commitSemaphore}. */ private void refreshHead() { RecordId id = journal.getHead(); - if (!id.equals(head.getRecordId())) { - head = new SegmentNodeState( + if (!id.equals(head.get().getRecordId())) { + SegmentNodeState state = new SegmentNodeState( store.getWriter().getDummySegment(), id); - changeDispatcher.contentChanged(head.getChildNode(ROOT), null); + head.set(state); + changeDispatcher.contentChanged(state.getChildNode(ROOT), null); } } @@ -119,7 +121,7 @@ public class SegmentNodeStore implements commitSemaphore.release(); } } - return head.getChildNode(ROOT); + return head.get().getChildNode(ROOT); } @Override @@ -196,14 +198,14 @@ public class SegmentNodeStore implements try { refreshHead(); - SegmentNodeState ns = head; - RecordId ri = head.getRecordId(); + SegmentNodeState state = head.get(); + RecordId ri = state.getRecordId(); - SegmentNodeBuilder builder = ns.builder(); + SegmentNodeBuilder builder = state.builder(); NodeBuilder cp = builder.child(name); cp.setProperty("timestamp", System.currentTimeMillis() + lifetime); - cp.setChildNode(ROOT, ns.getChildNode(ROOT)); + cp.setChildNode(ROOT, state.getChildNode(ROOT)); if (journal.setHead(ri, builder.getNodeState() .getRecordId())) { @@ -222,7 +224,7 @@ public class SegmentNodeStore implements @Override @CheckForNull public NodeState retrieve(@Nonnull String checkpoint) { - NodeState cp = head.getChildNode(checkpoint).getChildNode(ROOT); + NodeState cp = head.get().getChildNode(checkpoint).getChildNode(ROOT); if (cp.exists()) { return cp; } @@ -252,13 +254,13 @@ public class SegmentNodeStore implements } private boolean setHead(SegmentNodeBuilder builder) { - SegmentNodeState base = builder.getBaseState(); - SegmentNodeState head = builder.getNodeState(); + SegmentNodeState before = builder.getBaseState(); + SegmentNodeState after = builder.getNodeState(); refreshHead(); - if (journal.setHead(base.getRecordId(), head.getRecordId())) { - SegmentNodeStore.this.head = head; - changeDispatcher.contentChanged(head.getChildNode(ROOT), info); + if (journal.setHead(before.getRecordId(), after.getRecordId())) { + head.set(after); + changeDispatcher.contentChanged(after.getChildNode(ROOT), info); refreshHead(); return true; } else { @@ -267,8 +269,9 @@ public class SegmentNodeStore implements } private SegmentNodeBuilder prepare() throws CommitFailedException { - SegmentNodeBuilder builder = head.builder(); - if (fastEquals(before, head.getChildNode(ROOT))) { + SegmentNodeState state = head.get(); + SegmentNodeBuilder builder = state.builder(); + if (fastEquals(before, state.getChildNode(ROOT))) { // use a shortcut when there are no external changes builder.setChildNode(ROOT, hook.processCommit(before, after)); } else { @@ -293,8 +296,9 @@ public class SegmentNodeStore implements long start = System.nanoTime(); refreshHead(); - if (head.hasProperty("token") - && head.getLong("timeout") >= currentTimeMillis()) { + SegmentNodeState state = head.get(); + if (state.hasProperty("token") + && state.getLong("timeout") >= currentTimeMillis()) { // someone else has a pessimistic lock on the journal, // so we should not try to commit anything yet } else { @@ -321,15 +325,16 @@ public class SegmentNodeStore implements throws CommitFailedException, InterruptedException { while (true) { long now = currentTimeMillis(); - if (head.hasProperty("token") - && head.getLong("timeout") >= now) { + SegmentNodeState state = head.get(); + if (state.hasProperty("token") + && state.getLong("timeout") >= now) { // locked by someone else, wait until unlocked or expired Thread.sleep( - Math.min(head.getLong("timeout") - now, 1000), + Math.min(state.getLong("timeout") - now, 1000), random.nextInt(1000000)); } else { // attempt to acquire the lock - SegmentNodeBuilder builder = head.builder(); + SegmentNodeBuilder builder = state.builder(); builder.setProperty("token", UUID.randomUUID().toString()); builder.setProperty("timeout", now + timeout); @@ -358,7 +363,7 @@ public class SegmentNodeStore implements pessimisticMerge(timeout); } } - return head.getChildNode(ROOT); + return head.get().getChildNode(ROOT); } } Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java?rev=1548354&r1=1548353&r2=1548354&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (original) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java Fri Dec 6 01:22:26 2013 @@ -33,6 +33,7 @@ import java.util.LinkedList; import java.util.List; import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nonnull; @@ -40,7 +41,6 @@ import org.apache.jackrabbit.oak.plugins import org.apache.jackrabbit.oak.plugins.segment.Journal; import org.apache.jackrabbit.oak.plugins.segment.RecordId; import org.apache.jackrabbit.oak.plugins.segment.Segment; -import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState; import org.apache.jackrabbit.oak.spi.state.NodeBuilder; import org.apache.jackrabbit.oak.spi.state.NodeState; import org.slf4j.Logger; @@ -68,9 +68,16 @@ public class FileStore extends AbstractS private final RandomAccessFile journalFile; - private volatile RecordId head; + /** + * The latest head of the root journal. + */ + private final AtomicReference head; - private volatile boolean updated = false; + /** + * The persisted head of the root journal, used to determine whether the + * latest {@link #head} value should be written to the disk. + */ + private RecordId persistedHead = null; /** * The background flush thread. Automatically flushes the TarMK state @@ -94,7 +101,8 @@ public class FileStore extends AbstractS this(directory, EMPTY_NODE, maxFileSizeMB, cacheSizeMB, memoryMapping); } - public FileStore(File directory, NodeState initial, int maxFileSizeMB, + public FileStore( + final File directory, NodeState initial, int maxFileSizeMB, int cacheSizeMB, boolean memoryMapping) throws IOException { super(cacheSizeMB); checkNotNull(directory).mkdirs(); @@ -122,25 +130,24 @@ public class FileStore extends AbstractS } } - head = null; journalFile = new RandomAccessFile( new File(directory, JOURNAL_FILE_NAME), "rw"); String line = journalFile.readLine(); while (line != null) { int space = line.indexOf(' '); if (space != -1) { - head = RecordId.fromString(line.substring(0, space)); + persistedHead = RecordId.fromString(line.substring(0, space)); } line = journalFile.readLine(); } - if (head == null) { + if (persistedHead != null) { + head = new AtomicReference(persistedHead); + } else { NodeBuilder builder = EMPTY_NODE.builder(); builder.setChildNode("root", initial); - SegmentNodeState root = - getWriter().writeNode(builder.getNodeState()); - head = root.getRecordId(); - updated = true; + head = new AtomicReference( + getWriter().writeNode(builder.getNodeState()).getRecordId()); } this.flushThread = new Thread(new Runnable() { @@ -153,7 +160,7 @@ public class FileStore extends AbstractS flush(); } catch (IOException e) { log.warn("Failed to flush the TarMK at" + - FileStore.this.directory, e); + directory, e); } timeToClose.await(5, SECONDS); } @@ -169,7 +176,8 @@ public class FileStore extends AbstractS } public synchronized void flush() throws IOException { - if (updated) { + RecordId id = head.get(); + if (!id.equals(persistedHead)) { getWriter().flush(); for (TarFile file : bulkFiles) { file.flush(); @@ -177,8 +185,9 @@ public class FileStore extends AbstractS for (TarFile file : dataFiles) { file.flush(); } - journalFile.writeBytes(head + " root\n"); + journalFile.writeBytes(id + " root\n"); journalFile.getChannel().force(false); + persistedHead = id; } } @@ -235,19 +244,12 @@ public class FileStore extends AbstractS return new Journal() { @Override public RecordId getHead() { - return head; + return head.get(); } @Override - public boolean setHead(RecordId base, RecordId head) { - synchronized (FileStore.this) { - if (base.equals(FileStore.this.head)) { - updated = !head.equals(FileStore.this.head); - FileStore.this.head = head; - return true; - } else { - return false; - } - } + public boolean setHead(RecordId before, RecordId after) { + RecordId id = head.get(); + return id.equals(before) && head.compareAndSet(id, after); } @Override public void merge() {