marmotta-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sschaff...@apache.org
Subject [1/2] git commit: - batched commits of triples, should dramatically improve performance
Date Mon, 17 Jun 2013 11:52:17 GMT
Updated Branches:
  refs/heads/develop 83cc90f51 -> 5def50274


- batched commits of triples, should dramatically improve performance


Project: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/commit/c57bf422
Tree: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/tree/c57bf422
Diff: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/diff/c57bf422

Branch: refs/heads/develop
Commit: c57bf422f34abe7d749251467ca7a12a932454c2
Parents: 64cfc30
Author: Sebastian Schaffert <sschaffert@apache.org>
Authored: Mon Jun 17 13:51:08 2013 +0200
Committer: Sebastian Schaffert <sschaffert@apache.org>
Committed: Mon Jun 17 13:51:08 2013 +0200

----------------------------------------------------------------------
 .../kiwi/persistence/KiWiConnection.java        | 187 +++++++++++++++----
 .../marmotta/kiwi/persistence/KiWiDialect.java  |   6 +
 .../marmotta/kiwi/persistence/h2/H2Dialect.java |   5 +
 .../kiwi/persistence/mysql/MySQLDialect.java    |   5 +
 .../persistence/pgsql/PostgreSQLDialect.java    |   5 +
 .../marmotta/kiwi/model/caching/IntArray.java   |   4 +-
 .../kiwi/model/caching/TripleTable.java         |   2 +-
 7 files changed, 177 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/c57bf422/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
index 2b99ac2..a8dfe69 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
+++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
@@ -17,17 +17,16 @@
  */
 package org.apache.marmotta.kiwi.persistence;
 
+import info.aduna.iteration.*;
 import org.apache.marmotta.commons.sesame.model.LiteralCommons;
 import org.apache.marmotta.commons.sesame.model.Namespaces;
 import org.apache.marmotta.commons.util.DateUtils;
 import com.google.common.base.Preconditions;
-import info.aduna.iteration.CloseableIteration;
-import info.aduna.iteration.EmptyIteration;
-import info.aduna.iteration.ExceptionConvertingIteration;
 import net.sf.ehcache.Cache;
 import net.sf.ehcache.Element;
 import org.apache.commons.lang.LocaleUtils;
 import org.apache.marmotta.kiwi.caching.KiWiCacheManager;
+import org.apache.marmotta.kiwi.model.caching.TripleTable;
 import org.apache.marmotta.kiwi.model.rdf.*;
 import org.apache.marmotta.kiwi.persistence.util.ResultSetIteration;
 import org.apache.marmotta.kiwi.persistence.util.ResultTransformerFunction;
@@ -49,6 +48,7 @@ import java.util.HashSet;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * A KiWiConnection offers methods for storing and retrieving KiWiTriples, KiWiNodes, and
KiWiNamespaces in the
@@ -70,6 +70,7 @@ public class KiWiConnection {
 
     protected KiWiCacheManager cacheManager;
 
+    protected TripleTable<KiWiTriple> tripleBatch;
 
     /**
      * Cache nodes by database ID
@@ -119,11 +120,18 @@ public class KiWiConnection {
 
     private boolean autoCommit = false;
 
+    private boolean batchCommit = true;
+
+    private int batchSize = 1000;
+
+    private ReentrantLock commitLock;
 
     public KiWiConnection(KiWiPersistence persistence, KiWiDialect dialect, KiWiCacheManager
cacheManager) throws SQLException {
         this.cacheManager = cacheManager;
         this.dialect      = dialect;
         this.persistence  = persistence;
+        this.commitLock   = new ReentrantLock();
+        this.batchCommit  = dialect.isBatchSupported();
 
         initCachePool();
         initStatementCache();
@@ -163,6 +171,9 @@ public class KiWiConnection {
             connection = persistence.getJDBCConnection();
             connection.setAutoCommit(autoCommit);
         }
+        if(tripleBatch == null) {
+            tripleBatch = new TripleTable<KiWiTriple>();
+        }
     }
 
     /**
@@ -323,9 +334,9 @@ public class KiWiConnection {
         ResultSet result = querySize.executeQuery();
         try {
             if(result.next()) {
-                return result.getLong(1);
+                return result.getLong(1) + (tripleBatch != null ? tripleBatch.size() : 0);
             } else {
-                return 0;
+                return 0  + (tripleBatch != null ? tripleBatch.size() : 0);
             }
         } finally {
             result.close();
@@ -350,9 +361,9 @@ public class KiWiConnection {
         ResultSet result = querySize.executeQuery();
         try {
             if(result.next()) {
-                return result.getLong(1);
+                return result.getLong(1) + (tripleBatch != null ? tripleBatch.listTriples(null,null,null,context).size()
: 0);
             } else {
-                return 0;
+                return 0 + (tripleBatch != null ? tripleBatch.listTriples(null,null,null,context).size()
: 0);
             }
         } finally {
             result.close();
@@ -934,29 +945,38 @@ public class KiWiConnection {
 
         requireJDBCConnection();
 
-        try {
-            // retrieve a new triple ID and set it in the object
-            if(triple.getId() == null) {
-                triple.setId(getNextSequence("seq.triples"));
-            }
-
-            PreparedStatement insertTriple = getPreparedStatement("store.triple");
-            insertTriple.setLong(1,triple.getId());
-            insertTriple.setLong(2,triple.getSubject().getId());
-            insertTriple.setLong(3,triple.getPredicate().getId());
-            insertTriple.setLong(4,triple.getObject().getId());
-            insertTriple.setLong(5,triple.getContext().getId());
-            insertTriple.setBoolean(6,triple.isInferred());
-            insertTriple.setTimestamp(7, new Timestamp(triple.getCreated().getTime()));
-            int count = insertTriple.executeUpdate();
+        // retrieve a new triple ID and set it in the object
+        if(triple.getId() == null) {
+            triple.setId(getNextSequence("seq.triples"));
+        }
 
+        if(batchCommit) {
             cacheTriple(triple);
-
-            return count > 0;
-        } catch(SQLException ex) {
-            // this is an ugly hack to catch duplicate key errors in some databases (H2)
-            // better option could be http://stackoverflow.com/questions/6736518/h2-java-insert-ignore-allow-exception
-            return false;
+            boolean result = tripleBatch.add(triple);
+            if(tripleBatch.size() >= batchSize) {
+                flushBatch();
+            }
+            return result;
+        }  else {
+            try {
+                PreparedStatement insertTriple = getPreparedStatement("store.triple");
+                insertTriple.setLong(1,triple.getId());
+                insertTriple.setLong(2,triple.getSubject().getId());
+                insertTriple.setLong(3,triple.getPredicate().getId());
+                insertTriple.setLong(4,triple.getObject().getId());
+                insertTriple.setLong(5,triple.getContext().getId());
+                insertTriple.setBoolean(6,triple.isInferred());
+                insertTriple.setTimestamp(7, new Timestamp(triple.getCreated().getTime()));
+                int count = insertTriple.executeUpdate();
+
+                cacheTriple(triple);
+
+                return count > 0;
+            } catch(SQLException ex) {
+                // this is an ugly hack to catch duplicate key errors in some databases (H2)
+                // better option could be http://stackoverflow.com/questions/6736518/h2-java-insert-ignore-allow-exception
+                return false;
+            }
         }
     }
 
@@ -987,6 +1007,10 @@ public class KiWiConnection {
         // make sure the triple is marked as deleted in case some service still holds a reference
         triple.setDeleted(true);
         triple.setDeletedAt(new Date());
+
+        if(tripleBatch != null) {
+            tripleBatch.remove(triple);
+        }
     }
 
     /**
@@ -1161,13 +1185,25 @@ public class KiWiConnection {
 
         final ResultSet result = query.executeQuery();
 
+        if(tripleBatch != null && tripleBatch.size() > 0) {
+            return new UnionIteration<Statement, SQLException>(
+                    new IteratorIteration<Statement, SQLException>(tripleBatch.listTriples(subject,predicate,object,context).iterator()),
+                    new ResultSetIteration<Statement>(result, true, new ResultTransformerFunction<Statement>()
{
+                        @Override
+                        public Statement apply(ResultSet row) throws SQLException {  // could
be lazy without even asking the database
+                            return constructTripleFromDatabase(result);
+                        }
+                    })
+            );
+        }  else {
+            return new ResultSetIteration<Statement>(result, true, new ResultTransformerFunction<Statement>()
{
+                @Override
+                public Statement apply(ResultSet row) throws SQLException {
+                    return constructTripleFromDatabase(result);
+                }
+            });
+        }
 
-        return new ResultSetIteration<Statement>(result, true, new ResultTransformerFunction<Statement>()
{
-            @Override
-            public Statement apply(ResultSet row) throws SQLException {
-                return constructTripleFromDatabase(result);
-            }
-        });
     }
 
     /**
@@ -1565,6 +1601,47 @@ public class KiWiConnection {
     }
 
     /**
+     * Return true if batched commits are enabled. Batched commits will try to group database
operations and
+     * keep a memory log while storing triples. This can considerably improve the database
performance.
+     * @return
+     */
+    public boolean isBatchCommit() {
+        return batchCommit;
+    }
+
+    /**
+     * Enabled batched commits. Batched commits will try to group database operations and
+     * keep a memory log while storing triples. This can considerably improve the database
performance.
+     * @return
+     */
+    public void setBatchCommit(boolean batchCommit) {
+        if(dialect.isBatchSupported()) {
+            this.batchCommit = batchCommit;
+        } else {
+            log.warn("batch commits are not supported by this database dialect");
+        }
+    }
+
+
+    /**
+     * Return the size of a batch for batched commits. Batched commits will try to group
database operations and
+     * keep a memory log while storing triples. This can considerably improve the database
performance.
+     * @return
+     */
+    public int getBatchSize() {
+        return batchSize;
+    }
+
+    /**
+     * Set the size of a batch for batched commits. Batched commits will try to group database
operations and
+     * keep a memory log while storing triples. This can considerably improve the database
performance.
+     * @param batchSize
+     */
+    public void setBatchSize(int batchSize) {
+        this.batchSize = batchSize;
+    }
+
+    /**
      * Makes all changes made since the previous
      * commit/rollback permanent and releases any database locks
      * currently held by this <code>Connection</code> object.
@@ -1578,6 +1655,9 @@ public class KiWiConnection {
      * @see #setAutoCommit
      */
     public void commit() throws SQLException {
+        if(tripleBatch != null && tripleBatch.size() > 0) {
+            flushBatch();
+        }
         if(connection != null) {
             connection.commit();
         }
@@ -1596,6 +1676,9 @@ public class KiWiConnection {
      * @see #setAutoCommit
      */
     public void rollback() throws SQLException {
+        if(tripleBatch != null && tripleBatch.size() > 0) {
+            tripleBatch.clear();
+        }
         if(connection != null && !connection.isClosed()) {
             connection.rollback();
         }
@@ -1658,4 +1741,40 @@ public class KiWiConnection {
             connection.close();
         }
     }
+
+
+    private void flushBatch() throws SQLException {
+        if(batchCommit) {
+            requireJDBCConnection();
+
+            commitLock.lock();
+            try {
+                PreparedStatement insertTriple = getPreparedStatement("store.triple");
+                insertTriple.clearParameters();
+                for(KiWiTriple triple : tripleBatch) {
+                    // retrieve a new triple ID and set it in the object
+                    if(triple.getId() == null) {
+                        triple.setId(getNextSequence("seq.triples"));
+                        log.warn("the batched triple did not have an ID");
+                    }
+
+                    insertTriple.setLong(1,triple.getId());
+                    insertTriple.setLong(2,triple.getSubject().getId());
+                    insertTriple.setLong(3,triple.getPredicate().getId());
+                    insertTriple.setLong(4,triple.getObject().getId());
+                    insertTriple.setLong(5,triple.getContext().getId());
+                    insertTriple.setBoolean(6,triple.isInferred());
+                    insertTriple.setTimestamp(7, new Timestamp(triple.getCreated().getTime()));
+
+                    insertTriple.addBatch();
+                }
+                insertTriple.executeBatch();
+                tripleBatch.clear();
+            }  finally {
+                commitLock.unlock();
+            }
+
+        }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/c57bf422/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiDialect.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiDialect.java
b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiDialect.java
index 0c9b2a8..bd4879b 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiDialect.java
+++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiDialect.java
@@ -69,6 +69,12 @@ public abstract class KiWiDialect {
 
 
     /**
+     * Return true if batched commits are supported by this dialect.
+     * @return
+     */
+    public abstract boolean isBatchSupported();
+
+    /**
      * Return the contents of the SQL create script used for initialising an empty database
      * @return
      */

http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/c57bf422/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/h2/H2Dialect.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/h2/H2Dialect.java
b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/h2/H2Dialect.java
index 7a1c1ee..89c1971 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/h2/H2Dialect.java
+++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/h2/H2Dialect.java
@@ -46,6 +46,11 @@ public class H2Dialect extends KiWiDialect {
     }
 
     @Override
+    public boolean isBatchSupported() {
+        return false;
+    }
+
+    @Override
     public String getRegexp(String text, String pattern) {
         return text + " REGEXP " + pattern;
     }

http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/c57bf422/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/mysql/MySQLDialect.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/mysql/MySQLDialect.java
b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/mysql/MySQLDialect.java
index 7d147fa..c6fa1b6 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/mysql/MySQLDialect.java
+++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/mysql/MySQLDialect.java
@@ -57,6 +57,11 @@ public class MySQLDialect extends KiWiDialect {
     }
 
     @Override
+    public boolean isBatchSupported() {
+        return true;
+    }
+
+    @Override
     public String getRegexp(String text, String pattern) {
         return text + " RLIKE " + pattern;
     }

http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/c57bf422/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/pgsql/PostgreSQLDialect.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/pgsql/PostgreSQLDialect.java
b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/pgsql/PostgreSQLDialect.java
index d9ebd4a..ea7e033 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/pgsql/PostgreSQLDialect.java
+++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/pgsql/PostgreSQLDialect.java
@@ -47,6 +47,11 @@ public class PostgreSQLDialect extends KiWiDialect {
     }
 
     @Override
+    public boolean isBatchSupported() {
+        return true;
+    }
+
+    @Override
     public String getRegexp(String text, String pattern) {
         return text + " ~ " + pattern;
     }

http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/c57bf422/libraries/kiwi/kiwi-tripletable/src/main/java/org/apache/marmotta/kiwi/model/caching/IntArray.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-tripletable/src/main/java/org/apache/marmotta/kiwi/model/caching/IntArray.java
b/libraries/kiwi/kiwi-tripletable/src/main/java/org/apache/marmotta/kiwi/model/caching/IntArray.java
index f870611..594e2e1 100644
--- a/libraries/kiwi/kiwi-tripletable/src/main/java/org/apache/marmotta/kiwi/model/caching/IntArray.java
+++ b/libraries/kiwi/kiwi-tripletable/src/main/java/org/apache/marmotta/kiwi/model/caching/IntArray.java
@@ -72,7 +72,7 @@ public final class IntArray implements Comparable<IntArray> {
 
     }
 
-    public static final IntArray createSPOCMaxKey(Resource subject, URI property, Value object,
URI context){
+    public static final IntArray createSPOCMaxKey(Resource subject, URI property, Value object,
Resource context){
 
         // the cache key is generated by appending the bytes of the hashcodes of subject,
property, object, context and inferred and
         // storing them as a BigInteger; generating the cache key should thus be very efficient
@@ -112,7 +112,7 @@ public final class IntArray implements Comparable<IntArray> {
 
     }
 
-    public static final IntArray createCSPOMaxKey(Resource subject, URI property, Value object,
URI context){
+    public static final IntArray createCSPOMaxKey(Resource subject, URI property, Value object,
Resource context){
 
         // the cache key is generated by appending the bytes of the hashcodes of subject,
property, object, context and inferred and
         // storing them as a BigInteger; generating the cache key should thus be very efficient

http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/c57bf422/libraries/kiwi/kiwi-tripletable/src/main/java/org/apache/marmotta/kiwi/model/caching/TripleTable.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-tripletable/src/main/java/org/apache/marmotta/kiwi/model/caching/TripleTable.java
b/libraries/kiwi/kiwi-tripletable/src/main/java/org/apache/marmotta/kiwi/model/caching/TripleTable.java
index 8b40ca5..6b0ec57 100644
--- a/libraries/kiwi/kiwi-tripletable/src/main/java/org/apache/marmotta/kiwi/model/caching/TripleTable.java
+++ b/libraries/kiwi/kiwi-tripletable/src/main/java/org/apache/marmotta/kiwi/model/caching/TripleTable.java
@@ -400,7 +400,7 @@ public class TripleTable<Triple extends Statement> implements Set<Triple>,
Seria
      * @param context
      * @return
      */
-    public Collection<Triple> listTriples(final Resource subject, final URI property,
final Value object, final URI context) {
+    public Collection<Triple> listTriples(final Resource subject, final URI property,
final Value object, final Resource context) {
         // in special cases we can make use of the index
         if(subject != null && property != null && object != null &&
context != null) {
             IntArray key = IntArray.createSPOCKey(subject, property, object, context);


Mime
View raw message