jackrabbit-oak-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From resc...@apache.org
Subject svn commit: r1548211 - in /jackrabbit/oak/trunk/oak-core: pom.xml src/main/java/org/apache/jackrabbit/oak/plugins/sqlpersistence/ src/main/java/org/apache/jackrabbit/oak/plugins/sqlpersistence/SQLDocumentStore.java
Date Thu, 05 Dec 2013 17:24:33 GMT
Author: reschke
Date: Thu Dec  5 17:24:33 2013
New Revision: 1548211

URL: http://svn.apache.org/r1548211
Log:
OAK-1266 - work in progress SQL/JDBC DocumentStore implementation

Added:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/sqlpersistence/
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/sqlpersistence/SQLDocumentStore.java
  (with props)
Modified:
    jackrabbit/oak/trunk/oak-core/pom.xml

Modified: jackrabbit/oak/trunk/oak-core/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/pom.xml?rev=1548211&r1=1548210&r2=1548211&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-core/pom.xml Thu Dec  5 17:24:33 2013
@@ -278,7 +278,7 @@
       <groupId>com.googlecode.json-simple</groupId>
       <artifactId>json-simple</artifactId>
       <version>1.1</version>
-      <scope>test</scope>
+      <!--<scope>test</scope> temporarily changed the scope for OAK-1266-->
     </dependency>
   </dependencies>
 </project>

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/sqlpersistence/SQLDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/sqlpersistence/SQLDocumentStore.java?rev=1548211&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/sqlpersistence/SQLDocumentStore.java
(added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/sqlpersistence/SQLDocumentStore.java
Thu Dec  5 17:24:33 2013
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.sqlpersistence;
+
+import java.io.File;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
+
+import org.apache.jackrabbit.mk.api.MicroKernelException;
+import org.apache.jackrabbit.oak.plugins.mongomk.Collection;
+import org.apache.jackrabbit.oak.plugins.mongomk.Document;
+import org.apache.jackrabbit.oak.plugins.mongomk.DocumentStore;
+import org.apache.jackrabbit.oak.plugins.mongomk.MemoryDocumentStore;
+import org.apache.jackrabbit.oak.plugins.mongomk.NodeDocument;
+import org.apache.jackrabbit.oak.plugins.mongomk.Revision;
+import org.apache.jackrabbit.oak.plugins.mongomk.StableRevisionComparator;
+import org.apache.jackrabbit.oak.plugins.mongomk.UpdateOp;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+
+public class SQLDocumentStore implements DocumentStore {
+
+    public SQLDocumentStore() {
+        try {
+            initialize(new File("."));
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    @Override
+    public <T extends Document> T find(Collection<T> collection, String id) {
+        return find(collection, id, 0);
+    }
+
+    @Override
+    public <T extends Document> T find(Collection<T> collection, String id, int
maxCacheAge) {
+        // TODO handle maxCacheAge
+        return readDocument(collection, id);
+    }
+
+    @Override
+    public <T extends Document> List<T> query(Collection<T> collection,
String fromKey, String toKey, int limit) {
+        return query(collection, fromKey, toKey, null, 0, 0);
+    }
+
+    @Override
+    public <T extends Document> List<T> query(Collection<T> collection,
String fromKey, String toKey, String indexedProperty,
+            long startValue, int limit) {
+        return internalQuery(collection, fromKey, toKey, indexedProperty, startValue, limit);
+    }
+
+    @Override
+    public <T extends Document> void remove(Collection<T> collection, String
id) {
+        delete(collection, id);
+    }
+
+    @Override
+    public <T extends Document> boolean create(Collection<T> collection, List<UpdateOp>
updateOps) {
+        return internalCreate(collection, updateOps);
+    }
+
+    @Override
+    public <T extends Document> void update(Collection<T> collection, List<String>
keys, UpdateOp updateOp) {
+        internalUpdate(collection, keys, updateOp);
+    }
+
+    @Override
+    public <T extends Document> T createOrUpdate(Collection<T> collection, UpdateOp
update) throws MicroKernelException {
+        return internalCreateOrUpdate(collection, update, false);
+    }
+
+    @Override
+    public <T extends Document> T findAndUpdate(Collection<T> collection, UpdateOp
update) throws MicroKernelException {
+        return internalCreateOrUpdate(collection, update, true);
+    }
+
+    @Override
+    public void invalidateCache() {
+        // TODO
+    }
+
+    @Override
+    public <T extends Document> void invalidateCache(Collection<T> collection,
String id) {
+        // TODO
+    }
+
+    @Override
+    public void dispose() {
+        try {
+            this.connection.close();
+            this.connection = null;
+        } catch (SQLException ex) {
+            throw new MicroKernelException(ex);
+        }
+    }
+
+    @Override
+    public <T extends Document> T getIfCached(Collection<T> collection, String
id) {
+        return null;
+    }
+
+    // implementation
+
+    private final Comparator<Revision> comparator = Collections.reverseOrder(new StableRevisionComparator());
+
+    private Connection connection;
+
+    private void initialize(File homeDir) throws Exception {
+        File dbDir = new File(homeDir, "db");
+        if (!dbDir.exists()) {
+            dbDir.mkdirs();
+        }
+
+        String jdbcuri = "jdbc:h2:" + dbDir.getCanonicalPath() + "/revs";
+        connection = DriverManager.getConnection(jdbcuri, "sa", "");
+        connection.setAutoCommit(false);
+        Statement stmt = connection.createStatement();
+        stmt.execute("drop table if exists CLUSTERNODES");
+        stmt.execute("drop table if exists NODES");
+        stmt.execute("create table if not exists CLUSTERNODES(ID varchar primary key, MODIFIED
bigint, DATA varchar)");
+        stmt.execute("create table if not exists NODES(ID varchar primary key, MODIFIED bigint,
DATA varchar)");
+    }
+
+    @CheckForNull
+    private <T extends Document> boolean internalCreate(Collection<T> collection,
List<UpdateOp> updates) {
+        try {
+            for (UpdateOp update : updates) {
+                T doc = collection.newDocument(this);
+                MemoryDocumentStore.applyChanges(doc, update, comparator);
+                writeDocument(collection, doc, true);
+            }
+            // FIXME to be atomic
+            return true;
+        } catch (MicroKernelException ex) {
+            return false;
+        }
+    }
+
+    @CheckForNull
+    private <T extends Document> T internalCreateOrUpdate(Collection<T> collection,
UpdateOp update, boolean checkConditions) {
+        T oldDoc = readDocument(collection, update.getId());
+
+        T doc = collection.newDocument(this);
+        if (oldDoc == null) {
+            if (!update.isNew()) {
+                throw new MicroKernelException("Document does not exist: " + update.getId());
+            }
+        } else {
+            oldDoc.deepCopy(doc);
+        }
+        if (checkConditions && !MemoryDocumentStore.checkConditions(doc, update))
{
+            return null;
+        }
+        MemoryDocumentStore.applyChanges(doc, update, comparator);
+        writeDocument(collection, doc, oldDoc == null);
+        doc.seal();
+
+        return oldDoc;
+    }
+
+    @CheckForNull
+    private <T extends Document> void internalUpdate(Collection<T> collection,
List<String> ids, UpdateOp updateOp) {
+        String tableName = getTable(collection);
+        try {
+            for (String id : ids) {
+                String in = dbRead(connection, tableName, id);
+                if (in == null) {
+                    throw new MicroKernelException(tableName + " " + id + " not found");
+                }
+                T doc = fromString(collection, in);
+                MemoryDocumentStore.applyChanges(doc, updateOp, comparator);
+                String data = asString(doc);
+                dbUpdate(connection, tableName, id, (Long) doc.get("_modified"), data);
+            }
+            connection.commit();
+        } catch (Exception ex) {
+            throw new MicroKernelException(ex);
+        }
+    }
+
+    private <T extends Document> List<T> internalQuery(Collection<T> collection,
String fromKey, String toKey,
+            String indexedProperty, long startValue, int limit) {
+        String tableName = getTable(collection);
+        List<T> result = new ArrayList<T>();
+        if (indexedProperty != null && !"_modified".equals(indexedProperty)) {
+            throw new RuntimeException("indexed property " + indexedProperty + " not supported");
+        }
+        try {
+            List<String> dbresult = dbQuery(connection, tableName, fromKey, toKey,
indexedProperty, startValue);
+            for (String data : dbresult) {
+                T doc = fromString(collection, data);
+                doc.seal();
+                result.add(doc);
+            }
+        } catch (Exception ex) {
+            throw new MicroKernelException(ex);
+        }
+        return result;
+    }
+
+    private static <T extends Document> String getTable(Collection<T> collection)
{
+        if (collection == Collection.CLUSTER_NODES) {
+            return "CLUSTERNODES";
+        } else if (collection == Collection.NODES) {
+            return "NODES";
+        } else {
+            throw new IllegalArgumentException("Unknown collection: " + collection.toString());
+        }
+    }
+
+    private static String asString(Document doc) {
+        JSONObject obj = new JSONObject();
+        for (String key : doc.keySet()) {
+            Object value = doc.get(key);
+            obj.put(key, value);
+        }
+        return obj.toJSONString();
+    }
+
+    private <T extends Document> T fromString(Collection<T> collection, String
data) throws ParseException {
+        T doc = collection.newDocument(this);
+        Map<String, Object> obj = (Map<String, Object>) new JSONParser().parse(data);
+        for (Map.Entry<String, Object> entry : obj.entrySet()) {
+            String key = entry.getKey();
+            Object value = entry.getValue();
+            if (value == null) {
+                // ???
+                doc.put(key, value);
+            } else if (value instanceof Boolean || value instanceof Long || value instanceof
String) {
+                doc.put(key, value);
+            } else if (value instanceof JSONObject) {
+                doc.put(key, convertJsonObject((JSONObject) value));
+            } else {
+                throw new RuntimeException("unexpected type: " + value.getClass());
+            }
+        }
+        return doc;
+    }
+
+    @Nonnull
+    private Map<Revision, Object> convertJsonObject(@Nonnull JSONObject obj) {
+        Map<Revision, Object> map = new TreeMap<Revision, Object>(comparator);
+        Set<Map.Entry> entries = obj.entrySet();
+        for (Map.Entry entry : entries) {
+            // not clear why every persisted map is a revision map
+            map.put(Revision.fromString(entry.getKey().toString()), entry.getValue());
+        }
+        return map;
+    }
+
+    @CheckForNull
+    private <T extends Document> T readDocument(Collection<T> collection, String
id) {
+        String tableName = getTable(collection);
+        try {
+            String in = dbRead(connection, tableName, id);
+            return in != null ? fromString(collection, in) : null;
+        } catch (Exception ex) {
+            throw new MicroKernelException(ex);
+        }
+    }
+
+    private <T extends Document> void delete(Collection<T> collection, String
id) {
+        String tableName = getTable(collection);
+        try {
+            dbDelete(connection, tableName, id);
+            connection.commit();
+        } catch (Exception ex) {
+            throw new MicroKernelException(ex);
+        }
+    }
+
+    private <T extends Document> void writeDocument(Collection<T> collection,
T document, boolean insert) {
+        String tableName = getTable(collection);
+        try {
+            String data = asString(document);
+            if (insert) {
+                dbInsert(connection, tableName, document.getId(), (Long) document.get("_modified"),
data);
+            } else {
+                dbUpdate(connection, tableName, document.getId(), (Long) document.get("_modified"),
data);
+            }
+            connection.commit();
+        } catch (SQLException ex) {
+            throw new MicroKernelException(ex);
+        }
+    }
+
+    // low level operations
+
+    @CheckForNull
+    private String dbRead(Connection connection, String tableName, String id) throws SQLException
{
+        PreparedStatement stmt = connection.prepareStatement("select DATA from " + tableName
+ " where ID = ?");
+        try {
+            stmt.setString(1, id);
+            ResultSet rs = stmt.executeQuery();
+            if (rs.next()) {
+                return rs.getString(1);
+            } else {
+                return null;
+            }
+        } finally {
+            stmt.close();
+        }
+    }
+
+    private List<String> dbQuery(Connection connection, String tableName, String minId,
String maxId, String indexedProperty,
+            long startValue) throws SQLException {
+        String t = "select DATA from " + tableName + " where ID > ? and ID < ?";
+        if (indexedProperty != null) {
+            t += " and MODIFIED >= ?";
+        }
+        PreparedStatement stmt = connection.prepareStatement(t);
+        List<String> result = new ArrayList<String>();
+        try {
+            stmt.setString(1, minId);
+            stmt.setString(2, maxId);
+            if (indexedProperty != null) {
+                stmt.setLong(3, startValue);
+            }
+            ResultSet rs = stmt.executeQuery();
+            while (rs.next()) {
+                String data = rs.getString(1);
+                result.add(data);
+            }
+        } finally {
+            stmt.close();
+        }
+        return result;
+    }
+
+    private void dbUpdate(Connection connection, String tableName, String id, Long modified,
String data) throws SQLException {
+        PreparedStatement stmt = connection.prepareStatement("UPDATE " + tableName + " SET
MODIFIED = ?, DATA = ? WHERE ID = ?");
+        try {
+            stmt.setObject(1, modified, Types.BIGINT);
+            stmt.setString(2, data);
+            stmt.setString(3, id);
+            stmt.executeUpdate();
+        } finally {
+            stmt.close();
+        }
+    }
+
+    private void dbInsert(Connection connection, String tableName, String id, Long modified,
String data) throws SQLException {
+        PreparedStatement stmt = connection.prepareStatement("INSERT INTO " + tableName +
" VALUES(?, ?, ?)");
+        try {
+            stmt.setString(1, id);
+            stmt.setObject(2, modified, Types.BIGINT);
+            stmt.setString(3, data);
+            stmt.executeUpdate();
+        } finally {
+            stmt.close();
+        }
+    }
+
+    private void dbDelete(Connection connection, String tableName, String id) throws SQLException
{
+        PreparedStatement stmt = connection.prepareStatement("delete from " + tableName +
" where ID = ?");
+        try {
+            stmt.setString(1, id);
+            stmt.executeUpdate();
+        } finally {
+            stmt.close();
+        }
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/sqlpersistence/SQLDocumentStore.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/sqlpersistence/SQLDocumentStore.java
------------------------------------------------------------------------------
    svn:executable = *



Mime
View raw message