jena-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aj...@apache.org
Subject jena git commit: Trialing a per-graph writer dataset
Date Sat, 07 Jan 2017 20:33:18 GMT
Repository: jena
Updated Branches:
  refs/heads/ThreadPerGraphDataset [created] 8601cd302


Trialing a per-graph writer dataset


Project: http://git-wip-us.apache.org/repos/asf/jena/repo
Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/8601cd30
Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/8601cd30
Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/8601cd30

Branch: refs/heads/ThreadPerGraphDataset
Commit: 8601cd3028c0e57dbc805efc5d54d7b9ee272c3f
Parents: c987c1b
Author: ajs6f <ajs6f@virginia.edu>
Authored: Sat Jan 7 15:33:01 2017 -0500
Committer: ajs6f <ajs6f@virginia.edu>
Committed: Sat Jan 7 15:33:01 2017 -0500

----------------------------------------------------------------------
 .../core/DatasetGraphPerGraphLocking.java       | 295 +++++++++++++++++++
 .../core/JenaTransactionRegionException.java    |  14 +
 .../core/TS_DatasetGraphPerGraphLocking.java    |  11 +
 .../jena/sparql/core/pergraph/BasicTest.java    |  14 +
 .../sparql/core/pergraph/FindPatternsTest.java  |  14 +
 .../jena/sparql/core/pergraph/FindTest.java     |  14 +
 .../jena/sparql/core/pergraph/LockTest.java     |  16 +
 .../core/pergraph/MultithreadingTest.java       |  75 +++++
 .../core/pergraph/TransactionLifecycleTest.java |  22 ++
 .../jena/sparql/core/pergraph/ViewTest.java     |  13 +
 10 files changed, 488 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/jena/blob/8601cd30/jena-arq/src/main/java/org/apache/jena/sparql/core/DatasetGraphPerGraphLocking.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/core/DatasetGraphPerGraphLocking.java
b/jena-arq/src/main/java/org/apache/jena/sparql/core/DatasetGraphPerGraphLocking.java
new file mode 100644
index 0000000..30da6ed
--- /dev/null
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/core/DatasetGraphPerGraphLocking.java
@@ -0,0 +1,295 @@
+package org.apache.jena.sparql.core;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.jena.atlas.iterator.Iter.filter;
+import static org.apache.jena.graph.GraphUtil.addInto;
+import static org.apache.jena.query.ReadWrite.READ;
+import static org.apache.jena.query.ReadWrite.WRITE;
+import static org.apache.jena.sparql.core.Quad.defaultGraphIRI;
+import static org.apache.jena.sparql.core.Quad.isDefaultGraph;
+import static org.apache.jena.util.iterator.WrappedIterator.createNoRemove;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+
+import org.apache.jena.graph.Graph;
+import org.apache.jena.graph.Node;
+import org.apache.jena.graph.Triple;
+import org.apache.jena.graph.impl.GraphBase;
+import org.apache.jena.query.ReadWrite;
+import org.apache.jena.sparql.JenaTransactionException;
+import org.apache.jena.sparql.core.mem.TransactionalComponent;
+import org.apache.jena.sparql.core.mem.TriTable;
+import org.apache.jena.util.iterator.ExtendedIterator;
+
+public class DatasetGraphPerGraphLocking extends DatasetGraphCollection {
+
+    private final Map<Node, LockableGraph> graphs = new ConcurrentHashMap<>();
+
+    private final ThreadLocal<LockableGraph> graphInTransaction = new ThreadLocal<>();
+
+    private LockableGraph graphInTransaction() {
+        return graphInTransaction.get();
+    }
+
+    private final ThreadLocal<ReadWrite> transactionType = new ThreadLocal<>();
+
+    private ReadWrite transactionType() {
+        return transactionType.get();
+    }
+
+    @Override
+    public Iterator<Node> listGraphNodes() {
+        return filter(graphs.keySet().iterator(), gn -> !isDefaultGraph(gn));
+    }
+
+    @Override
+    public boolean supportsTransactions() {
+        return true;
+    }
+
+    @Override
+    public boolean isInTransaction() {
+        return transactionType() != null;
+    }
+
+    @Override
+    public void begin(ReadWrite readWrite) {
+        if (isInTransaction()) throw new JenaTransactionException("Cannot nest transactions!");
+        transactionType.set(readWrite);
+    }
+
+    private void graphAction(Consumer<LockableGraph> action) {
+        if (graphInTransaction() != null) action.accept(graphInTransaction());
+    }
+
+    @Override
+    public void commit() {
+        if (!isInTransaction()) throw new JenaTransactionException("Cannot commit outside
a transaction!");
+        graphAction(LockableGraph::commit);
+        cleanAfterTransaction();
+    }
+
+    @Override
+    public void abort() {
+        if (!isInTransaction()) throw new JenaTransactionException("Cannot abort outside
a transaction!");
+        graphAction(LockableGraph::abort);
+        cleanAfterTransaction();
+    }
+
+    @Override
+    public void end() {
+        cleanAfterTransaction();
+    }
+
+    private void cleanAfterTransaction() {
+        graphInTransaction.remove();
+        transactionType.remove();
+    }
+
+    @Override
+    public Graph getDefaultGraph() {
+        return getGraph(defaultGraphIRI);
+    }
+
+    @Override
+    public void setDefaultGraph(final Graph g) {
+        addGraph(defaultGraphIRI, g);
+    }
+
+    @Override
+    public Graph getGraph(final Node graphName) {
+        if (isInTransaction()) {
+            // are we using the current graph?
+            if (graphInTransaction()!= null && graphName.equals(graphInTransaction().graphName()))
return graphInTransaction();
+            // are we starting work with a graph?
+            if (graphInTransaction() == null) {
+                LockableGraph graph = graphs.computeIfAbsent(graphName, LockableGraph::new);
+                graph.begin(transactionType());
+                graphInTransaction.set(graph);
+                return graph;
+            }
+            // we already have a graph in hand and cannot use another!
+            throw new JenaTransactionRegionException("Cannot work with more than one graph
per transaction!");
+        }
+        // we must work without a transaction
+        return graphs.computeIfAbsent(graphName, LockableGraph::new);
+    }
+
+    @Override
+    public void clear() {
+        if (!isInTransaction()) _clear(graphs.values());
+        else switch (transactionType()) {
+        case READ:
+            throw new JenaTransactionException("Cannot clear during a READ transaction!");
+        case WRITE:
+            // avoid trying to nest a transaction on a graph we may have in hand
+            _clear(graphs.values().stream().filter(g -> !g.equals(graphInTransaction())).collect(toList()));
+            graphAction(LockableGraph::clear);
+        }
+
+    }
+
+    private static void _clear(Collection<LockableGraph> graphs) {
+        try {
+            graphs.forEach(LockableGraph::beginWrite);
+            graphs.forEach(LockableGraph::clear);
+            graphs.forEach(LockableGraph::commit);
+        } finally {
+            graphs.forEach(LockableGraph::end);
+        }
+    }
+
+    @Override
+    public void addGraph(Node graphName, Graph triples) {
+        wholeGraphAction(graphName, g -> addInto(getGraph(g), triples));
+    }
+
+    @Override
+    public void removeGraph(Node graphName) {
+        wholeGraphAction(graphName, graphs::remove);
+    }
+
+    private void wholeGraphAction(Node graphName, Consumer<Node> action) {
+        getGraph(graphName).clear();
+        action.accept(graphName);
+    }
+
+    /**
+     * A graph with a distinguished node, which we use as a name.
+     */
+    private abstract static class PointedGraph extends GraphBase {
+
+        private final Node distinguishedNode;
+
+        public PointedGraph(Node graphName) {
+            this.distinguishedNode = graphName;
+        }
+
+        public Node graphName() {
+            return distinguishedNode;
+        }
+    }
+
+    /**
+     * A {@link PointedGraph} that features a write-lock and supports transactions. If a
mutation is made outside a
+     * transaction, it is auto-wrapped in a transaction.
+     */
+    private static class LockableGraph extends PointedGraph implements TransactionalComponent
{
+
+        public LockableGraph(Node graphName) {
+            super(graphName);
+        }
+
+        /**
+         * We permit only one concurrent writer per named graph.
+         */
+        private final Lock writeLock = new ReentrantLock(true);
+
+        private final ThreadLocal<ReadWrite> currentTransactionType = new ThreadLocal<>();
+
+        private ReadWrite currentTransactionType() {
+            return currentTransactionType.get();
+        }
+
+        private final TriTable table = new TriTable();
+
+        @Override
+        protected ExtendedIterator<Triple> graphBaseFind(Triple t) {
+            if (currentTransactionType() == null) {
+                begin(READ);
+                try {
+                    return _find(t);
+                } finally {
+                    end();
+                }
+            }
+            return _find(t);
+        }
+
+        /**
+         * Must be called inside a transaction!
+         * 
+         * @param t the triple-pattern to search
+         * @return matches from the in-transaction table
+         */
+        private ExtendedIterator<Triple> _find(Triple t) {
+            return createNoRemove(table.find(t.getSubject(), t.getPredicate(), t.getObject()).iterator());
+        }
+
+        @Override
+        public void performAdd(Triple t) {
+            performMutation(t, table::add);
+        }
+
+        @Override
+        public void performDelete(Triple t) {
+            performMutation(t, table::delete);
+        }
+
+        private void performMutation(Triple t, Consumer<Triple> action) {
+            final ReadWrite readWrite = currentTransactionType();
+            if (readWrite == null) {
+                begin(WRITE);
+                try {
+                    action.accept(t);
+                    commit();
+                } finally {
+                    end();
+                }
+            } else switch (readWrite) {
+            case READ:
+                throw new JenaTransactionException("Cannot write during a READ transaction!");
+            case WRITE:
+                action.accept(t);
+            }
+        }
+
+        @Override
+        protected int graphBaseSize() {
+            // TODO make this efficient, somehow
+            return super.graphBaseSize();
+        }
+
+        @Override
+        public void begin(ReadWrite readWrite) {
+            if (currentTransactionType() != null) throw new JenaTransactionException("Cannot
nest transactions!");
+            if (WRITE.equals(readWrite)) writeLock.lock();
+            currentTransactionType.set(readWrite);
+            table.begin(readWrite);
+        }
+
+        public void beginWrite() {
+            begin(WRITE);
+        }
+
+        @Override
+        public void commit() {
+            table.commit();
+            finishTransaction();
+        }
+
+        @Override
+        public void abort() {
+            table.abort();
+            finishTransaction();
+        }
+
+        @Override
+        public void end() {
+            table.end();
+            finishTransaction();
+        }
+
+        private void finishTransaction() {
+            final ReadWrite readWrite = currentTransactionType();
+            currentTransactionType.remove();
+            if (WRITE.equals(readWrite)) writeLock.unlock();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/8601cd30/jena-arq/src/main/java/org/apache/jena/sparql/core/JenaTransactionRegionException.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/core/JenaTransactionRegionException.java
b/jena-arq/src/main/java/org/apache/jena/sparql/core/JenaTransactionRegionException.java
new file mode 100644
index 0000000..aaaef6c
--- /dev/null
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/core/JenaTransactionRegionException.java
@@ -0,0 +1,14 @@
+package org.apache.jena.sparql.core;
+
+import org.apache.jena.sparql.JenaTransactionException;
+
+/**
+ * Thrown when a transaction attempts to work outside its controlled region of data.
+ */
+public class JenaTransactionRegionException extends JenaTransactionException {
+
+    public JenaTransactionRegionException(String message) {
+        super(message);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/8601cd30/jena-arq/src/test/java/org/apache/jena/sparql/core/TS_DatasetGraphPerGraphLocking.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/core/TS_DatasetGraphPerGraphLocking.java
b/jena-arq/src/test/java/org/apache/jena/sparql/core/TS_DatasetGraphPerGraphLocking.java
new file mode 100644
index 0000000..60e363e
--- /dev/null
+++ b/jena-arq/src/test/java/org/apache/jena/sparql/core/TS_DatasetGraphPerGraphLocking.java
@@ -0,0 +1,11 @@
+package org.apache.jena.sparql.core;
+
+import org.apache.jena.sparql.core.pergraph.*;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({ BasicTest.class, FindPatternsTest.class, FindTest.class, LockTest.class,
TransactionLifecycleTest.class,
+		ViewTest.class, MultithreadingTest.class })
+public class TS_DatasetGraphPerGraphLocking {}

http://git-wip-us.apache.org/repos/asf/jena/blob/8601cd30/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/BasicTest.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/BasicTest.java b/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/BasicTest.java
new file mode 100644
index 0000000..770973b
--- /dev/null
+++ b/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/BasicTest.java
@@ -0,0 +1,14 @@
+package org.apache.jena.sparql.core.pergraph;
+
+import org.apache.jena.sparql.core.AbstractDatasetGraphTests;
+import org.apache.jena.sparql.core.DatasetGraph;
+import org.apache.jena.sparql.core.DatasetGraphPerGraphLocking;
+
+public class BasicTest extends AbstractDatasetGraphTests {
+
+	@Override
+	protected DatasetGraph emptyDataset() {
+		return new DatasetGraphPerGraphLocking();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/8601cd30/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/FindPatternsTest.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/FindPatternsTest.java
b/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/FindPatternsTest.java
new file mode 100644
index 0000000..5934a9f
--- /dev/null
+++ b/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/FindPatternsTest.java
@@ -0,0 +1,14 @@
+package org.apache.jena.sparql.core.pergraph;
+
+import org.apache.jena.sparql.core.AbstractDatasetGraphFindPatterns;
+import org.apache.jena.sparql.core.DatasetGraph;
+import org.apache.jena.sparql.core.DatasetGraphPerGraphLocking;
+
+public class FindPatternsTest extends AbstractDatasetGraphFindPatterns {
+
+	@Override
+	protected DatasetGraph create() {
+		return new DatasetGraphPerGraphLocking();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/8601cd30/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/FindTest.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/FindTest.java b/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/FindTest.java
new file mode 100644
index 0000000..447dd98
--- /dev/null
+++ b/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/FindTest.java
@@ -0,0 +1,14 @@
+package org.apache.jena.sparql.core.pergraph;
+
+import org.apache.jena.sparql.core.AbstractDatasetGraphFind;
+import org.apache.jena.sparql.core.DatasetGraph;
+import org.apache.jena.sparql.core.DatasetGraphPerGraphLocking;
+
+public class FindTest extends AbstractDatasetGraphFind {
+
+	@Override
+	protected DatasetGraph create() {
+		return new DatasetGraphPerGraphLocking();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/8601cd30/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/LockTest.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/LockTest.java b/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/LockTest.java
new file mode 100644
index 0000000..782fde5
--- /dev/null
+++ b/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/LockTest.java
@@ -0,0 +1,16 @@
+package org.apache.jena.sparql.core.pergraph;
+
+import static org.apache.jena.query.DatasetFactory.wrap;
+
+import org.apache.jena.query.Dataset;
+import org.apache.jena.sparql.core.DatasetGraphPerGraphLocking;
+import org.apache.jena.sparql.core.mem.TestDatasetGraphInMemoryLock;
+
+public class LockTest extends TestDatasetGraphInMemoryLock {
+
+	@Override
+	protected Dataset createDataset() {
+		return wrap(new DatasetGraphPerGraphLocking());
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/8601cd30/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/MultithreadingTest.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/MultithreadingTest.java
b/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/MultithreadingTest.java
new file mode 100644
index 0000000..e99c372
--- /dev/null
+++ b/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/MultithreadingTest.java
@@ -0,0 +1,75 @@
+package org.apache.jena.sparql.core.pergraph;
+
+import static com.jayway.awaitility.Awaitility.waitAtMost;
+import static com.jayway.awaitility.Duration.TEN_SECONDS;
+import static org.apache.jena.graph.NodeFactory.createLiteral;
+import static org.apache.jena.graph.NodeFactory.createURI;
+import static org.apache.jena.query.ReadWrite.READ;
+import static org.apache.jena.query.ReadWrite.WRITE;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.jena.atlas.junit.BaseTest;
+import org.apache.jena.graph.Node;
+import org.apache.jena.sparql.core.DatasetGraphPerGraphLocking;
+import org.junit.Test;
+import org.slf4j.Logger;
+
+public class MultithreadingTest extends BaseTest {
+
+    private static final Logger log = getLogger(MultithreadingTest.class);
+
+    private static final Node dummy = createURI("info:test");
+
+    private static final Node graph1 = createURI("info:graph1");
+
+    private static final Node graph2 = createURI("info:graph2");
+
+    @Test
+    public void loadTwoGraphsAtOnce() {
+        DatasetGraphPerGraphLocking dataset = new DatasetGraphPerGraphLocking();
+
+        // We start a thread loading a graph, then wait for the main thread to start loading
a different graph. The
+        // first thread must wait to see that the main thread has successfully started loading
its graph to finish its
+        // load. So when the first thread does finish, this proves that two graphs were being
loaded simultaneously.
+
+        AtomicBoolean startMain = new AtomicBoolean(), baton = new AtomicBoolean(), finishLine
= new AtomicBoolean();
+
+        new Thread(() -> {
+            dataset.begin(WRITE);
+            try {
+                dataset.add(graph1, dummy, dummy, createLiteral("before"));
+                // wait for the baton
+                startMain.set(true);
+                waitAtMost(TEN_SECONDS).untilTrue(baton);
+                dataset.add(graph1, dummy, dummy, createLiteral("after"));
+                dataset.commit();
+                finishLine.set(true);
+            } finally {
+                dataset.end();
+            }
+        }).start();
+
+        waitAtMost(TEN_SECONDS).untilTrue(startMain);
+        dataset.begin(WRITE);
+        try {
+            dataset.add(graph2, dummy, dummy, createLiteral("before"));
+            // pass the baton
+            baton.set(true);
+            dataset.add(graph2, dummy, dummy, createLiteral("after"));
+            dataset.commit();
+        } finally {
+            dataset.end();
+        }
+        waitAtMost(TEN_SECONDS).untilTrue(finishLine);
+        dataset.begin(READ);
+        try {
+            assertTrue("Failed to find the triple that proves that the first thread finished!",
+                    dataset.contains(graph1, dummy, dummy, createLiteral("after")));
+        } finally {
+            dataset.end();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/8601cd30/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/TransactionLifecycleTest.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/TransactionLifecycleTest.java
b/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/TransactionLifecycleTest.java
new file mode 100644
index 0000000..65d91ff
--- /dev/null
+++ b/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/TransactionLifecycleTest.java
@@ -0,0 +1,22 @@
+package org.apache.jena.sparql.core.pergraph;
+
+import static org.apache.jena.query.DatasetFactory.wrap;
+
+import org.apache.jena.query.Dataset;
+import org.apache.jena.sparql.core.DatasetGraphPerGraphLocking;
+import org.apache.jena.sparql.transaction.AbstractTestTransactionLifecycle;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class TransactionLifecycleTest extends AbstractTestTransactionLifecycle {
+
+    @Override
+    protected Dataset create() {
+        return wrap(new DatasetGraphPerGraphLocking());
+    }
+
+    @Test
+    @Override
+    @Ignore("Block this test in the superclass because we can have multiple writers.")
+    public synchronized void transaction_concurrency_writer() {}
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/8601cd30/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/ViewTest.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/ViewTest.java b/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/ViewTest.java
new file mode 100644
index 0000000..5bd5b05
--- /dev/null
+++ b/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/ViewTest.java
@@ -0,0 +1,13 @@
+package org.apache.jena.sparql.core.pergraph;
+
+import org.apache.jena.sparql.core.DatasetGraph;
+import org.apache.jena.sparql.core.DatasetGraphPerGraphLocking;
+import org.apache.jena.sparql.core.TestDatasetGraphViewGraphs;
+
+public class ViewTest extends TestDatasetGraphViewGraphs {
+
+	@Override
+	protected DatasetGraph createBaseDSG() {
+		return new DatasetGraphPerGraphLocking();
+	}
+}


Mime
View raw message