jackrabbit-oak-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From resc...@apache.org
Subject svn commit: r1568240 [2/4] - in /jackrabbit/oak/trunk: oak-auth-external/src/main/java/org/apache/jackrabbit/oak/spi/security/authentication/external/ oak-auth-external/src/main/java/org/apache/jackrabbit/oak/spi/security/authentication/external/impl/ ...
Date Fri, 14 Feb 2014 11:21:34 GMT
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobStoreConfiguration.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobStoreConfiguration.java?rev=1568240&r1=1568239&r2=1568240&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobStoreConfiguration.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobStoreConfiguration.java Fri Feb 14 11:21:31 2014
@@ -1,176 +1,176 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.jackrabbit.oak.plugins.blob;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-import javax.annotation.Nullable;
-
-import org.osgi.framework.BundleContext;
-
-import com.google.common.base.Predicate;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-/**
- * Defines the configuration needed by a BlobStore.
- */
-public class BlobStoreConfiguration {
-
-    public static final String PRIMARY_DATA_STORE = "primary";
-
-    public static final String ARCHIVE_DATA_STORE = "archive";
-
-    public static final String PROP_DATA_STORE = "dataStoreProvider";
-
-    public static final String PROP_BLOB_STORE_PROVIDER = "blobStoreProvider";
-
-    public static final String DEFAULT_BLOB_STORE_PROVIDER = "";
-
-    private Map<String, String> configMap;
-
-    private Set<String> propKeys;
-
-    /**
-     * Instantiates a new data store configuration.
-     */
-    private BlobStoreConfiguration() {
-        configMap = Maps.newHashMap();
-        propKeys = Sets.newHashSet();
-
-        // get default props
-        Properties props = new Properties();
-        try {
-            props.load(this.getClass().getResourceAsStream("blobstore.properties"));
-        } catch (IOException e) {
-        }
-
-        // populate keys from the default set
-        Map<String, String> defaultMap = Maps.fromProperties(props);
-        Iterator<String> iter = defaultMap.keySet().iterator();
-        while (iter.hasNext()) {
-            propKeys.add(iter.next());
-        }
-
-        // Remove empty default properties from the map
-        getConfigMap().putAll(
-                Maps.filterValues(defaultMap, new Predicate<String>() {
-                    @Override
-                    public boolean apply(@Nullable String input) {
-                        if ((input == null) || input.trim().length() == 0) {
-                            return false;
-                        }
-                        return true;
-                    }
-                }));
-    }
-
-    /**
-     * Creates a new configuration object with default values.
-     * 
-     * @return the data store configuration
-     */
-    public static BlobStoreConfiguration newInstance() {
-        return new BlobStoreConfiguration();
-    }
-
-    /**
-     * Load configuration from the system props.
-     * 
-     * @return the configuration
-     */
-    public BlobStoreConfiguration loadFromSystemProps() {
-        // remove all jvm set properties to trim the map
-        getConfigMap().putAll(Maps.filterKeys(Maps.fromProperties(System.getProperties()), new Predicate<String>() {
-            @Override
-            public boolean apply(@Nullable String input) {
-                if (input.startsWith("java.") || input.startsWith("sun.") || input.startsWith("user.")
-                        || input.startsWith("file.") || input.startsWith("line.") || input.startsWith("os.")
-                        || input.startsWith("awt.") || input.startsWith("path.")) {
-                    return false;
-                } else {
-                    return true;
-                }
-            }
-        }));
-        return this;
-    }
-
-    /**
-     * Load configuration from a map.
-     * 
-     * @param map
-     *            the map
-     * @return the configuration
-     */
-    @SuppressWarnings("unchecked")
-    public BlobStoreConfiguration loadFromMap(Map<String, ?> cfgMap) {
-        getConfigMap().putAll((Map<? extends String, ? extends String>) cfgMap);
-        loadFromSystemProps();
-
-        return this;
-    }
-
-    /**
-     * Load configuration from a BundleContext or the map provided.
-     * 
-     * @param map
-     *            the map
-     * @param context
-     *            the context
-     * @return the configuration
-     */
-    public BlobStoreConfiguration loadFromContextOrMap(Map<String, ?> map, BundleContext context) {
-        loadFromMap(map);
-
-        Map<String, Object> contextMap = Maps.newHashMap();
-        for (String key : getPropKeys()) {
-            if (context.getProperty(key) != null) {
-                contextMap.put(key, context.getProperty(key));
-            }
-        }
-        return this;
-    }
-
-    public String getProperty(String key) {
-        return getConfigMap().get(key);
-    }
-
-    public void addProperty(String key, String val) {
-        getConfigMap().put(key, val);
-    }
-
-    public Map<String, String> getConfigMap() {
-        return configMap;
-    }
-
-    public void setConfigMap(Map<String, String> configMap) {
-        this.configMap = configMap;
-    }
-
-    public Set<String> getPropKeys() {
-        return propKeys;
-    }
-
-    public void setPropKeys(Set<String> propKeys) {
-        this.propKeys = propKeys;
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
+import org.osgi.framework.BundleContext;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * Defines the configuration needed by a BlobStore.
+ */
+public class BlobStoreConfiguration {
+
+    public static final String PRIMARY_DATA_STORE = "primary";
+
+    public static final String ARCHIVE_DATA_STORE = "archive";
+
+    public static final String PROP_DATA_STORE = "dataStoreProvider";
+
+    public static final String PROP_BLOB_STORE_PROVIDER = "blobStoreProvider";
+
+    public static final String DEFAULT_BLOB_STORE_PROVIDER = "";
+
+    private Map<String, String> configMap;
+
+    private Set<String> propKeys;
+
+    /**
+     * Instantiates a new data store configuration.
+     */
+    private BlobStoreConfiguration() {
+        configMap = Maps.newHashMap();
+        propKeys = Sets.newHashSet();
+
+        // get default props
+        Properties props = new Properties();
+        try {
+            props.load(this.getClass().getResourceAsStream("blobstore.properties"));
+        } catch (IOException e) {
+        }
+
+        // populate keys from the default set
+        Map<String, String> defaultMap = Maps.fromProperties(props);
+        Iterator<String> iter = defaultMap.keySet().iterator();
+        while (iter.hasNext()) {
+            propKeys.add(iter.next());
+        }
+
+        // Remove empty default properties from the map
+        getConfigMap().putAll(
+                Maps.filterValues(defaultMap, new Predicate<String>() {
+                    @Override
+                    public boolean apply(@Nullable String input) {
+                        if ((input == null) || input.trim().length() == 0) {
+                            return false;
+                        }
+                        return true;
+                    }
+                }));
+    }
+
+    /**
+     * Creates a new configuration object with default values.
+     * 
+     * @return the data store configuration
+     */
+    public static BlobStoreConfiguration newInstance() {
+        return new BlobStoreConfiguration();
+    }
+
+    /**
+     * Load configuration from the system props.
+     * 
+     * @return the configuration
+     */
+    public BlobStoreConfiguration loadFromSystemProps() {
+        // remove all jvm set properties to trim the map
+        getConfigMap().putAll(Maps.filterKeys(Maps.fromProperties(System.getProperties()), new Predicate<String>() {
+            @Override
+            public boolean apply(@Nullable String input) {
+                if (input.startsWith("java.") || input.startsWith("sun.") || input.startsWith("user.")
+                        || input.startsWith("file.") || input.startsWith("line.") || input.startsWith("os.")
+                        || input.startsWith("awt.") || input.startsWith("path.")) {
+                    return false;
+                } else {
+                    return true;
+                }
+            }
+        }));
+        return this;
+    }
+
+    /**
+     * Load configuration from a map.
+     * 
+     * @param map
+     *            the map
+     * @return the configuration
+     */
+    @SuppressWarnings("unchecked")
+    public BlobStoreConfiguration loadFromMap(Map<String, ?> cfgMap) {
+        getConfigMap().putAll((Map<? extends String, ? extends String>) cfgMap);
+        loadFromSystemProps();
+
+        return this;
+    }
+
+    /**
+     * Load configuration from a BundleContext or the map provided.
+     * 
+     * @param map
+     *            the map
+     * @param context
+     *            the context
+     * @return the configuration
+     */
+    public BlobStoreConfiguration loadFromContextOrMap(Map<String, ?> map, BundleContext context) {
+        loadFromMap(map);
+
+        Map<String, Object> contextMap = Maps.newHashMap();
+        for (String key : getPropKeys()) {
+            if (context.getProperty(key) != null) {
+                contextMap.put(key, context.getProperty(key));
+            }
+        }
+        return this;
+    }
+
+    public String getProperty(String key) {
+        return getConfigMap().get(key);
+    }
+
+    public void addProperty(String key, String val) {
+        getConfigMap().put(key, val);
+    }
+
+    public Map<String, String> getConfigMap() {
+        return configMap;
+    }
+
+    public void setConfigMap(Map<String, String> configMap) {
+        this.configMap = configMap;
+    }
+
+    public Set<String> getPropKeys() {
+        return propKeys;
+    }
+
+    public void setPropKeys(Set<String> propKeys) {
+        this.propKeys = propKeys;
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobStoreConfiguration.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobStoreHelper.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/cloud/CloudBlobStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/cloud/CloudBlobStore.java?rev=1568240&r1=1568239&r2=1568240&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/cloud/CloudBlobStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/cloud/CloudBlobStore.java Fri Feb 14 11:21:31 2014
@@ -1,304 +1,304 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.jackrabbit.oak.plugins.blob.cloud;
-
-import static org.jclouds.blobstore.options.ListContainerOptions.Builder.maxResults;
-import static org.jclouds.blobstore.options.PutOptions.Builder.multipart;
-
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.NoSuchElementException;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import com.google.common.io.ByteStreams;
-
-import org.apache.jackrabbit.mk.blobs.AbstractBlobStore;
-import org.apache.jackrabbit.mk.util.StringUtils;
-import org.jclouds.ContextBuilder;
-import org.jclouds.blobstore.BlobStoreContext;
-import org.jclouds.blobstore.domain.Blob;
-import org.jclouds.blobstore.domain.PageSet;
-import org.jclouds.blobstore.domain.StorageMetadata;
-import org.jclouds.io.Payload;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implementation of the {@link BlobStore} to store blobs in a cloud blob store.
- * <p>
- * Extends {@link AbstractBlobStore} and breaks the the binary to chunks for easier management.
- */
-public class CloudBlobStore extends AbstractBlobStore {
-    /**
-     * Logger instance.
-     */
-    private static final Logger LOG = LoggerFactory.getLogger(CloudBlobStore.class);
-
-    /** Cloud Store context */
-    private BlobStoreContext context;
-
-    /** The bucket. */
-    private String cloudContainer;
-
-    private String accessKey;
-
-    private String secretKey;
-
-    private String cloudProvider;
-
-    protected String getCloudContainer() {
-        return cloudContainer;
-    }
-
-    public void setCloudContainer(String cloudContainer) {
-        this.cloudContainer = cloudContainer;
-    }
-
-    public String getAccessKey() {
-        return accessKey;
-    }
-
-    public void setAccessKey(String accessKey) {
-        this.accessKey = accessKey;
-    }
-
-    public String getSecretKey() {
-        return secretKey;
-    }
-
-    public void setSecretKey(String secretKey) {
-        this.secretKey = secretKey;
-    }
-
-    public String getCloudProvider() {
-        return cloudProvider;
-    }
-
-    public void setCloudProvider(String cloudProvider) {
-        this.cloudProvider = cloudProvider;
-    }
-
-    /**
-     * Instantiates a connection to the cloud blob store.
-     * 
-     * @param cloudProvider
-     *            the cloud provider
-     * @param accessKey
-     *            the access key
-     * @param secretKey
-     *            the secret key
-     * @param cloudContainer
-     *            the bucket
-     * @throws Exception
-     */
-    public void init() throws Exception {
-        try {
-            this.context =
-                    ContextBuilder.newBuilder(cloudProvider)
-                            .credentials(accessKey, secretKey)
-                            .buildView(BlobStoreContext.class);
-            context.getBlobStore().createContainerInLocation(null, cloudContainer);
-
-            LOG.info("Using container : " + cloudContainer);
-        } catch (Exception e) {
-            LOG.error("Error creating CloudBlobStore : ", e);
-            throw e;
-        }
-    }
-
-    /**
-     * Uploads the block to the cloud service.
-     */
-    @Override
-    protected void storeBlock(byte[] digest, int level, byte[] data) throws IOException {
-        Preconditions.checkNotNull(context);
-
-        String id = StringUtils.convertBytesToHex(digest);
-
-        org.jclouds.blobstore.BlobStore blobStore = context.getBlobStore();
-
-        if (!blobStore.blobExists(cloudContainer, id)) {
-            Map<String, String> metadata = Maps.newHashMap();
-            metadata.put("level", String.valueOf(level));
-
-            Blob blob = blobStore.blobBuilder(id)
-                    .payload(data)
-                    .userMetadata(metadata)
-                    .build();
-            String etag = blobStore.putBlob(cloudContainer, blob, multipart());
-            LOG.debug("Blob " + id + " created with cloud tag : " + etag);
-        } else {
-            LOG.debug("Blob " + id + " already exists");
-        }
-    }
-
-    /**
-     * Reads the data from the actual cloud service.
-     */
-    @Override
-    protected byte[] readBlockFromBackend(BlockId blockId) throws Exception {
-        Preconditions.checkNotNull(context);
-
-        String id = StringUtils.convertBytesToHex(blockId.getDigest());
-
-        Blob cloudBlob = context.getBlobStore().getBlob(cloudContainer, id);
-        if (cloudBlob == null) {
-            String message = "Did not find block " + id;
-            LOG.error(message);
-            throw new IOException(message);
-        }
-
-        Payload payload = cloudBlob.getPayload();
-        try {
-            byte[] data = ByteStreams.toByteArray(payload.getInput());
-
-            if (blockId.getPos() == 0) {
-                return data;
-            }
-
-            int len = (int) (data.length - blockId.getPos());
-            if (len < 0) {
-                return new byte[0];
-            }
-            byte[] d2 = new byte[len];
-            System.arraycopy(data, (int) blockId.getPos(), d2, 0, len);
-            return d2;
-        } finally {
-            payload.close();
-        }
-    }
-
-    /**
-     * Delete the cloud container and all its contents.
-     * 
-     */
-    public void deleteBucket() {
-        Preconditions.checkNotNull(context);
-
-        if (context.getBlobStore().containerExists(cloudContainer)) {
-            context.getBlobStore().deleteContainer(cloudContainer);
-        }
-        context.close();
-    }
-
-    @Override
-    public void startMark() throws IOException {
-        // No-op
-    }
-
-    @Override
-    protected void mark(BlockId id) throws Exception {
-        // No-op
-    }
-
-    @Override
-    public int sweep() throws IOException {
-        return 0;
-    }
-
-    @Override
-    protected boolean isMarkEnabled() {
-        return false;
-    }
-
-    @Override
-    public Iterator<String> getAllChunkIds(
-            long maxLastModifiedTime) throws Exception {
-        Preconditions.checkNotNull(context);
-
-        final org.jclouds.blobstore.BlobStore blobStore = context.getBlobStore();
-        return new CloudStoreIterator(blobStore, maxLastModifiedTime);
-}
-
-    @Override
-    public boolean deleteChunk(String chunkId) throws Exception {
-        Preconditions.checkNotNull(context);
-
-        final org.jclouds.blobstore.BlobStore blobStore = context.getBlobStore();
-        blobStore.removeBlob(cloudContainer, chunkId);
-
-        return true;
-    }
-
-    class CloudStoreIterator implements Iterator<String> {
-        static final int BATCH = 1000;
-
-        org.jclouds.blobstore.BlobStore store;
-        long maxLastModifiedTime;
-
-        PageSet<? extends StorageMetadata> set;
-        ArrayDeque<String> queue;
-
-        public CloudStoreIterator(org.jclouds.blobstore.BlobStore store,
-                long maxLastModifiedTime) {
-            this.store = store;
-            this.maxLastModifiedTime = maxLastModifiedTime;
-            this.queue = new ArrayDeque<String>(BATCH);
-        }
-
-        @Override
-        public boolean hasNext() {
-            if ((set == null) || (queue == null)) {
-                set = store.list(cloudContainer, maxResults(1000));
-                loadElements(set);
-            }
-
-            if (!queue.isEmpty()) {
-                return true;
-            } else if (set.getNextMarker() != null) {
-                set = store.list(cloudContainer,
-                        maxResults(BATCH).afterMarker(set.getNextMarker()));
-                loadElements(set);
-
-                if (!queue.isEmpty()) {
-                    return true;
-                }
-            }
-
-            return false;
-        }
-
-        private void loadElements(PageSet<? extends StorageMetadata> set) {
-            Iterator<? extends StorageMetadata> iter = set.iterator();
-            while (iter.hasNext()) {
-                StorageMetadata metadata = iter.next();
-                if ((maxLastModifiedTime == 0 || maxLastModifiedTime == -1) ||
-                        (metadata.getLastModified().getTime() <= maxLastModifiedTime)) {
-                    queue.add(metadata.getName());
-                } else {
-                    queue.add(metadata.getName());
-                }
-            }
-        }
-
-        @Override
-        public String next() {
-            if (!hasNext()) {
-                throw new NoSuchElementException("No more elements");
-            }
-            return queue.poll();
-        }
-
-        @Override
-        public void remove() {
-            throw new UnsupportedOperationException();
-        }
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob.cloud;
+
+import static org.jclouds.blobstore.options.ListContainerOptions.Builder.maxResults;
+import static org.jclouds.blobstore.options.PutOptions.Builder.multipart;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.io.ByteStreams;
+
+import org.apache.jackrabbit.mk.blobs.AbstractBlobStore;
+import org.apache.jackrabbit.mk.util.StringUtils;
+import org.jclouds.ContextBuilder;
+import org.jclouds.blobstore.BlobStoreContext;
+import org.jclouds.blobstore.domain.Blob;
+import org.jclouds.blobstore.domain.PageSet;
+import org.jclouds.blobstore.domain.StorageMetadata;
+import org.jclouds.io.Payload;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of the {@link BlobStore} to store blobs in a cloud blob store.
+ * <p>
+ * Extends {@link AbstractBlobStore} and breaks the the binary to chunks for easier management.
+ */
+public class CloudBlobStore extends AbstractBlobStore {
+    /**
+     * Logger instance.
+     */
+    private static final Logger LOG = LoggerFactory.getLogger(CloudBlobStore.class);
+
+    /** Cloud Store context */
+    private BlobStoreContext context;
+
+    /** The bucket. */
+    private String cloudContainer;
+
+    private String accessKey;
+
+    private String secretKey;
+
+    private String cloudProvider;
+
+    protected String getCloudContainer() {
+        return cloudContainer;
+    }
+
+    public void setCloudContainer(String cloudContainer) {
+        this.cloudContainer = cloudContainer;
+    }
+
+    public String getAccessKey() {
+        return accessKey;
+    }
+
+    public void setAccessKey(String accessKey) {
+        this.accessKey = accessKey;
+    }
+
+    public String getSecretKey() {
+        return secretKey;
+    }
+
+    public void setSecretKey(String secretKey) {
+        this.secretKey = secretKey;
+    }
+
+    public String getCloudProvider() {
+        return cloudProvider;
+    }
+
+    public void setCloudProvider(String cloudProvider) {
+        this.cloudProvider = cloudProvider;
+    }
+
+    /**
+     * Instantiates a connection to the cloud blob store.
+     * 
+     * @param cloudProvider
+     *            the cloud provider
+     * @param accessKey
+     *            the access key
+     * @param secretKey
+     *            the secret key
+     * @param cloudContainer
+     *            the bucket
+     * @throws Exception
+     */
+    public void init() throws Exception {
+        try {
+            this.context =
+                    ContextBuilder.newBuilder(cloudProvider)
+                            .credentials(accessKey, secretKey)
+                            .buildView(BlobStoreContext.class);
+            context.getBlobStore().createContainerInLocation(null, cloudContainer);
+
+            LOG.info("Using container : " + cloudContainer);
+        } catch (Exception e) {
+            LOG.error("Error creating CloudBlobStore : ", e);
+            throw e;
+        }
+    }
+
+    /**
+     * Uploads the block to the cloud service.
+     */
+    @Override
+    protected void storeBlock(byte[] digest, int level, byte[] data) throws IOException {
+        Preconditions.checkNotNull(context);
+
+        String id = StringUtils.convertBytesToHex(digest);
+
+        org.jclouds.blobstore.BlobStore blobStore = context.getBlobStore();
+
+        if (!blobStore.blobExists(cloudContainer, id)) {
+            Map<String, String> metadata = Maps.newHashMap();
+            metadata.put("level", String.valueOf(level));
+
+            Blob blob = blobStore.blobBuilder(id)
+                    .payload(data)
+                    .userMetadata(metadata)
+                    .build();
+            String etag = blobStore.putBlob(cloudContainer, blob, multipart());
+            LOG.debug("Blob " + id + " created with cloud tag : " + etag);
+        } else {
+            LOG.debug("Blob " + id + " already exists");
+        }
+    }
+
+    /**
+     * Reads the data from the actual cloud service.
+     */
+    @Override
+    protected byte[] readBlockFromBackend(BlockId blockId) throws Exception {
+        Preconditions.checkNotNull(context);
+
+        String id = StringUtils.convertBytesToHex(blockId.getDigest());
+
+        Blob cloudBlob = context.getBlobStore().getBlob(cloudContainer, id);
+        if (cloudBlob == null) {
+            String message = "Did not find block " + id;
+            LOG.error(message);
+            throw new IOException(message);
+        }
+
+        Payload payload = cloudBlob.getPayload();
+        try {
+            byte[] data = ByteStreams.toByteArray(payload.getInput());
+
+            if (blockId.getPos() == 0) {
+                return data;
+            }
+
+            int len = (int) (data.length - blockId.getPos());
+            if (len < 0) {
+                return new byte[0];
+            }
+            byte[] d2 = new byte[len];
+            System.arraycopy(data, (int) blockId.getPos(), d2, 0, len);
+            return d2;
+        } finally {
+            payload.close();
+        }
+    }
+
+    /**
+     * Delete the cloud container and all its contents.
+     * 
+     */
+    public void deleteBucket() {
+        Preconditions.checkNotNull(context);
+
+        if (context.getBlobStore().containerExists(cloudContainer)) {
+            context.getBlobStore().deleteContainer(cloudContainer);
+        }
+        context.close();
+    }
+
+    @Override
+    public void startMark() throws IOException {
+        // No-op
+    }
+
+    @Override
+    protected void mark(BlockId id) throws Exception {
+        // No-op
+    }
+
+    @Override
+    public int sweep() throws IOException {
+        return 0;
+    }
+
+    @Override
+    protected boolean isMarkEnabled() {
+        return false;
+    }
+
+    @Override
+    public Iterator<String> getAllChunkIds(
+            long maxLastModifiedTime) throws Exception {
+        Preconditions.checkNotNull(context);
+
+        final org.jclouds.blobstore.BlobStore blobStore = context.getBlobStore();
+        return new CloudStoreIterator(blobStore, maxLastModifiedTime);
+}
+
+    @Override
+    public boolean deleteChunk(String chunkId) throws Exception {
+        Preconditions.checkNotNull(context);
+
+        final org.jclouds.blobstore.BlobStore blobStore = context.getBlobStore();
+        blobStore.removeBlob(cloudContainer, chunkId);
+
+        return true;
+    }
+
+    class CloudStoreIterator implements Iterator<String> {
+        static final int BATCH = 1000;
+
+        org.jclouds.blobstore.BlobStore store;
+        long maxLastModifiedTime;
+
+        PageSet<? extends StorageMetadata> set;
+        ArrayDeque<String> queue;
+
+        public CloudStoreIterator(org.jclouds.blobstore.BlobStore store,
+                long maxLastModifiedTime) {
+            this.store = store;
+            this.maxLastModifiedTime = maxLastModifiedTime;
+            this.queue = new ArrayDeque<String>(BATCH);
+        }
+
+        @Override
+        public boolean hasNext() {
+            if ((set == null) || (queue == null)) {
+                set = store.list(cloudContainer, maxResults(1000));
+                loadElements(set);
+            }
+
+            if (!queue.isEmpty()) {
+                return true;
+            } else if (set.getNextMarker() != null) {
+                set = store.list(cloudContainer,
+                        maxResults(BATCH).afterMarker(set.getNextMarker()));
+                loadElements(set);
+
+                if (!queue.isEmpty()) {
+                    return true;
+                }
+            }
+
+            return false;
+        }
+
+        private void loadElements(PageSet<? extends StorageMetadata> set) {
+            Iterator<? extends StorageMetadata> iter = set.iterator();
+            while (iter.hasNext()) {
+                StorageMetadata metadata = iter.next();
+                if ((maxLastModifiedTime == 0 || maxLastModifiedTime == -1) ||
+                        (metadata.getLastModified().getTime() <= maxLastModifiedTime)) {
+                    queue.add(metadata.getName());
+                } else {
+                    queue.add(metadata.getName());
+                }
+            }
+        }
+
+        @Override
+        public String next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException("No more elements");
+            }
+            return queue.poll();
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/cloud/CloudBlobStore.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/cloud/CloudBlobStoreBuilder.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java?rev=1568240&r1=1568239&r2=1568240&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java Fri Feb 14 11:21:31 2014
@@ -1,560 +1,560 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.jackrabbit.oak.plugins.blob.datastore;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import java.util.concurrent.ExecutionException;
-
-import org.apache.jackrabbit.core.data.CachingDataStore;
-import org.apache.jackrabbit.core.data.DataIdentifier;
-import org.apache.jackrabbit.core.data.DataRecord;
-import org.apache.jackrabbit.core.data.DataStore;
-import org.apache.jackrabbit.core.data.DataStoreException;
-import org.apache.jackrabbit.core.data.MultiDataStoreAware;
-import org.apache.jackrabbit.mk.blobs.GarbageCollectableBlobStore;
-import org.apache.jackrabbit.mk.util.Cache;
-import org.apache.jackrabbit.mk.util.IOUtils;
-import org.apache.jackrabbit.mk.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Strings;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-import com.google.common.collect.Lists;
-
-/**
- * A {@link BlobStore} implementation which is a compatibility wrapper for
- * Jackrabbit {@link DataStore}.
- * <p>
- * Uses a 2 level cache to improve random read performance.
- * 
- * Caches the {@link InputStream} until fully read or closed. Number of streams
- * cached are controlled by the
- * {@link DataStoreConfiguration#getStreamCacheSize()} parameter
- * 
- * Also, uses a 16MB bytes[] cache.
- * 
- */
-public class DataStoreBlobStore implements GarbageCollectableBlobStore,
-        Cache.Backend<DataStoreBlobStore.LogicalBlockId, DataStoreBlobStore.Data> {
-
-    /**
-     * Logger instance.
-     */
-    private static final Logger LOG = LoggerFactory.getLogger(DataStoreBlobStore.class);
-
-    protected static final int BLOCK_SIZE_LIMIT = 40;
-
-    private static final int DEFAULT_STREAM_CACHE_SIZE = 256;
-
-    /**
-     * The size of a block. 128 KB has been found to be as fast as larger
-     * values, and faster than smaller values. 2 MB results in less files.
-     */
-    private int blockSize = 2 * 1024 * 1024;
-
-    /**
-     * The block cache (16 MB). Caches blocks up to blockSize.
-     */
-    private Cache<LogicalBlockId, Data> blockCache = Cache.newInstance(this, 16 * 1024 * 1024);
-
-    /** The stream cache size. */
-    protected int streamCacheSize;
-
-    /**
-     * The stream cache caches a number of streams to avoid opening a new stream
-     * on every random access read.
-     */
-    private LoadingCache<String, InputStream> streamCache;
-
-    private LoadingCache<String, Long> fileLengthCache;
-
-    /** The data store. */
-    private DataStore dataStore;
-
-    /**
-     * Gets the stream cache size.
-     * 
-     * @return the stream cache size
-     */
-    protected int getStreamCacheSize() {
-        return streamCacheSize;
-    }
-
-    /**
-     * Sets the stream cache size.
-     * 
-     * @param streamCacheSize
-     *            the new stream cache size
-     */
-    protected void setStreamCacheSize(int streamCacheSize) {
-        this.streamCacheSize = streamCacheSize;
-    }
-
-    /**
-     * Sets the block size.
-     * 
-     * @param x
-     *            the new block size
-     */
-    public final void setBlockSize(final int x) {
-        validateBlockSize(x);
-        this.blockSize = x;
-    }
-
-    /**
-     * Validate block size.
-     * 
-     * @param x
-     *            the x
-     */
-    private static void validateBlockSize(final int x) {
-        if (x < BLOCK_SIZE_LIMIT) {
-            throw new IllegalArgumentException("The minimum size must be bigger "
-                    + "than a content hash itself; limit = " + BLOCK_SIZE_LIMIT);
-        }
-    }
-
-    /**
-     * Initialized the blob store.
-     * 
-     * @param dataStore
-     *            the data store
-     * @param streamCacheSize
-     *            the stream cache size
-     */
-    public void init(DataStore dataStore) {
-        if (streamCacheSize <= 0) {
-            streamCacheSize = DEFAULT_STREAM_CACHE_SIZE;
-        }
-
-        streamCache = CacheBuilder.newBuilder().maximumSize(streamCacheSize)
-                .removalListener(new RemovalListener<String, InputStream>() {
-                    public void onRemoval(RemovalNotification<String, InputStream> removal) {
-                        InputStream stream = removal.getValue();
-                        IOUtils.closeQuietly(stream);
-                    }
-                }).build(new CacheLoader<String, InputStream>() {
-                    public InputStream load(String key) throws Exception {
-                        return loadStream(key);
-                    }
-                });
-        fileLengthCache = CacheBuilder.newBuilder().maximumSize(streamCacheSize)
-                .build(new CacheLoader<String, Long>() {
-                    @Override
-                    public Long load(String key) throws Exception {
-                        return getBlobLength(key);
-                    }
-                });
-        this.dataStore = dataStore;
-    }
-
-    /**
-     * Writes the input stream to the data store.
-     */
-    @Override
-    public String writeBlob(InputStream in) throws IOException {
-        try {
-            // add the record in the data store
-            DataRecord dataRec = dataStore.addRecord(in);
-            return dataRec.getIdentifier().toString();
-        } catch (DataStoreException e) {
-            throw new IOException(e);
-        } finally {
-            IOUtils.closeQuietly(in);
-        }
-    }
-
-    /**
-     * Reads the blob with the given blob id and range.
-     */
-    @Override
-    public int readBlob(String blobId, long pos, byte[] buff, int off, int length) throws IOException {
-        if (Strings.isNullOrEmpty(blobId)) {
-            return -1;
-        }
-
-        long blobLength;
-        try {
-            blobLength = fileLengthCache.get(blobId);
-        } catch (ExecutionException e) {
-            LOG.debug("File length cache error", e);
-            blobLength = getBlobLength(blobId);
-        }
-        LOG.debug("read {" + blobId + "}, {" + blobLength + "}");
-
-        long position = pos;
-        int offset = off;
-
-        if (position < blobLength) {
-            int totalLength = 0;
-            long bytesLeft = ((position + length) > blobLength ? blobLength - position : length);
-
-            // Reads all the logical blocks satisfying the required range
-            while (bytesLeft > 0) {
-                long posBlockStart = position / blockSize;
-                int posOffsetInBlock = (int) (position - posBlockStart * blockSize);
-
-                byte[] block = readBlock(blobId, posBlockStart);
-
-                long bytesToRead = Math.min(bytesLeft,
-                        Math.min((blobLength - posOffsetInBlock), (blockSize - posOffsetInBlock)));
-                System.arraycopy(block, posOffsetInBlock, buff, offset, (int) bytesToRead);
-
-                position += bytesToRead;
-                offset += bytesToRead;
-                totalLength += bytesToRead;
-                bytesLeft -= bytesToRead;
-            }
-            return totalLength;
-        } else {
-            LOG.trace("Blob read for pos " + pos + "," + (pos + length - 1) + " out of range");
-            return -1;
-        }
-    }
-
-    /**
-     * Gets the data store.
-     * 
-     * @return the data store
-     */
-    public DataStore getDataStore() {
-        return dataStore;
-    }
-
-    /**
-     * Sets the data store.
-     * 
-     * @param dataStore
-     *            the data store
-     */
-    protected void setDataStore(DataStore dataStore) {
-        this.dataStore = dataStore;
-    }
-
-    /**
-     * Load the block to the cache.
-     */
-    @Override
-    public final Data load(final LogicalBlockId id) {
-        byte[] data;
-        try {
-            data = readBlockFromBackend(id);
-        } catch (Exception e) {
-            throw new RuntimeException("failed to read block from backend, id " + id, e);
-        }
-        if (data == null) {
-            throw new IllegalArgumentException("The block with id " + id + " was not found");
-        }
-        LOG.debug("Read from backend (Cache Miss): " + id);
-        return new Data(data);
-    }
-
-    /**
-     * Gets the length of the blob identified by the blobId.
-     */
-    @Override
-    public final long getBlobLength(final String blobId) throws IOException {
-        if (Strings.isNullOrEmpty(blobId)) {
-            return 0;
-        }
-
-        Long length = null;
-        try {
-            if (dataStore instanceof CachingDataStore) {
-                length = ((CachingDataStore) dataStore).getLength(new DataIdentifier(blobId));
-            } else {
-                length = dataStore.getRecord(new DataIdentifier(blobId)).getLength();
-            }
-            return length;
-        } catch (DataStoreException e) {
-            throw new IOException("Could not get length of blob for id " + blobId, e);
-        }
-    }
-
-    /**
-     * Reads block from backend.
-     * 
-     * @param id
-     *            the id
-     * @return the byte[]
-     * @throws IOException
-     *             Signals that an I/O exception has occurred.
-     */
-    private byte[] readBlockFromBackend(final LogicalBlockId id) throws IOException {
-        String key = StringUtils.convertBytesToHex(id.digest);
-        InputStream stream = null;
-        try {
-            stream = streamCache.get(key);
-        } catch (ExecutionException e) {
-            LOG.debug("Error retrieving from stream cache : " + key, e);
-        }
-
-        byte[] block = new byte[blockSize];
-        org.apache.commons.io.IOUtils.read(stream, block, 0, blockSize);
-
-        if ((stream != null) && (stream.available() <= 0)) {
-            streamCache.invalidate(key);
-        }
-        return block;
-    }
-
-    /**
-     * Loads the stream from the data store.
-     * 
-     * @param key
-     *            the key
-     * @return the input stream
-     * @throws IOException
-     *             Signals that an I/O exception has occurred.
-     */
-    private InputStream loadStream(String key) throws IOException {
-        InputStream stream = null;
-        try {
-            stream = dataStore.getRecord(new DataIdentifier(key)).getStream();
-        } catch (DataStoreException e) {
-            throw new IOException("Could not read blob for id " + key, e);
-        }
-        return stream;
-    }
-
-    /**
-     * Reads block.
-     * 
-     * @param blobId
-     *            the blob id
-     * @param posStart
-     *            the pos start
-     * @return the byte[]
-     * @throws Exception
-     *             the exception
-     */
-    private byte[] readBlock(final String blobId, final long posStart) throws IOException {
-        byte[] digest = StringUtils.convertHexToBytes(blobId);
-        LogicalBlockId id = new LogicalBlockId(digest, posStart);
-
-        LOG.debug("Trying to read from cache : " + blobId + ", " + posStart);
-
-        return blockCache.get(id).data;
-    }
-
-    /**
-     * Delete all blobs older than.
-     * 
-     * @param time
-     *            the time
-     * @return the int
-     * @throws Exception
-     *             the exception
-     */
-    public int deleteAllOlderThan(long time) throws Exception {
-        return dataStore.deleteAllOlderThan(time);
-    }
-
-    /**
-     * A file is divided into logical chunks. Blocks are small enough to fit in
-     * memory, so they can be cached.
-     */
-    public static class LogicalBlockId {
-
-        /** The digest. */
-        final byte[] digest;
-
-        /** The starting pos. */
-        final long pos;
-
-        /**
-         * Instantiates a new logical block id.
-         * 
-         * @param digest
-         *            the digest
-         * @param pos
-         *            the starting position of the block
-         */
-        LogicalBlockId(final byte[] digest, final long pos) {
-            this.digest = digest;
-            this.pos = pos;
-        }
-
-        @Override
-        public final boolean equals(final Object other) {
-            if (this == other) {
-                return true;
-            }
-            if (other == null || !(other instanceof LogicalBlockId)) {
-                return false;
-            }
-            LogicalBlockId o = (LogicalBlockId) other;
-            return Arrays.equals(digest, o.digest) && pos == o.pos;
-        }
-
-        @Override
-        public final int hashCode() {
-            return Arrays.hashCode(digest) ^ (int) (pos >> 32) ^ (int) pos;
-        }
-
-        @Override
-        public final String toString() {
-            return StringUtils.convertBytesToHex(digest) + "@" + pos;
-        }
-
-        /**
-         * Gets the digest.
-         * 
-         * @return the digest
-         */
-        public final byte[] getDigest() {
-            return digest;
-        }
-
-        /**
-         * Gets the starting position.
-         * 
-         * @return the starting position
-         */
-        public final long getPos() {
-            return pos;
-        }
-    }
-
-    /**
-     * The data for a block.
-     */
-    public static class Data implements Cache.Value {
-
-        /** The data. */
-        final byte[] data;
-
-        /**
-         * Instantiates a new data.
-         * 
-         * @param data
-         *            the data
-         */
-        Data(final byte[] data) {
-            this.data = data;
-        }
-
-        @Override
-        public final String toString() {
-            String s = StringUtils.convertBytesToHex(data);
-            return s.length() > 100 ? s.substring(0, 100) + ".. (len=" + data.length + ")" : s;
-        }
-
-        @Override
-        public final int getMemory() {
-            return data.length;
-        }
-    }
-
-    @Override
-    public String writeBlob(String tempFileName) throws IOException {
-        File file = new File(tempFileName);
-        InputStream in = null;
-        try {
-            in = new FileInputStream(file);
-            return writeBlob(in);
-        } finally {
-            if (in != null) {
-                in.close();
-            }
-            file.delete();
-        }
-    }
-
-    @Override
-    public int sweep() throws IOException {
-        // no-op
-        return 0;
-    }
-
-    @Override
-    public void startMark() throws IOException {
-    }
-
-    @Override
-    public void clearInUse() {
-        dataStore.clearInUse();
-    }
-
-    @Override
-    public void clearCache() {
-        // no-op
-    }
-
-    @Override
-    public long getBlockSizeMin() {
-        // no-op
-        return 0;
-    }
-
-    /**
-     * Ignores the maxLastModifiedTime currently.
-     */
-    @Override
-    public Iterator<String> getAllChunkIds(
-            long maxLastModifiedTime) throws Exception {
-        return new DataStoreIterator(dataStore.getAllIdentifiers());
-    }
-
-    @Override
-    public boolean deleteChunk(String blobId) throws Exception {
-        ((MultiDataStoreAware) dataStore).deleteRecord(new DataIdentifier(blobId));
-        return true;
-    }
-
-    @Override
-    public Iterator<String> resolveChunks(String blobId) throws IOException {
-        return Lists.newArrayList(blobId).iterator();
-    }
-
-    class DataStoreIterator implements Iterator<String> {
-        Iterator<DataIdentifier> backingIterator;
-
-        public DataStoreIterator(Iterator<DataIdentifier> backingIterator) {
-            this.backingIterator = backingIterator;
-        }
-
-        @Override
-        public boolean hasNext() {
-            return backingIterator.hasNext();
-        }
-
-        @Override
-        public String next() {
-            if (!hasNext()) {
-                throw new NoSuchElementException("No more elements");
-            }
-            return backingIterator.next().toString();
-        }
-
-        @Override
-        public void remove() {
-            throw new UnsupportedOperationException();
-        }
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob.datastore;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.jackrabbit.core.data.CachingDataStore;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStore;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.core.data.MultiDataStoreAware;
+import org.apache.jackrabbit.mk.blobs.GarbageCollectableBlobStore;
+import org.apache.jackrabbit.mk.util.Cache;
+import org.apache.jackrabbit.mk.util.IOUtils;
+import org.apache.jackrabbit.mk.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Strings;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.Lists;
+
+/**
+ * A {@link BlobStore} implementation which is a compatibility wrapper for
+ * Jackrabbit {@link DataStore}.
+ * <p>
+ * Uses a 2 level cache to improve random read performance.
+ * 
+ * Caches the {@link InputStream} until fully read or closed. Number of streams
+ * cached are controlled by the
+ * {@link DataStoreConfiguration#getStreamCacheSize()} parameter
+ * 
+ * Also, uses a 16MB bytes[] cache.
+ * 
+ */
+public class DataStoreBlobStore implements GarbageCollectableBlobStore,
+        Cache.Backend<DataStoreBlobStore.LogicalBlockId, DataStoreBlobStore.Data> {
+
+    /**
+     * Logger instance.
+     */
+    private static final Logger LOG = LoggerFactory.getLogger(DataStoreBlobStore.class);
+
+    protected static final int BLOCK_SIZE_LIMIT = 40;
+
+    private static final int DEFAULT_STREAM_CACHE_SIZE = 256;
+
+    /**
+     * The size of a block. 128 KB has been found to be as fast as larger
+     * values, and faster than smaller values. 2 MB results in less files.
+     */
+    private int blockSize = 2 * 1024 * 1024;
+
+    /**
+     * The block cache (16 MB). Caches blocks up to blockSize.
+     */
+    private Cache<LogicalBlockId, Data> blockCache = Cache.newInstance(this, 16 * 1024 * 1024);
+
+    /** The stream cache size. */
+    protected int streamCacheSize;
+
+    /**
+     * The stream cache caches a number of streams to avoid opening a new stream
+     * on every random access read.
+     */
+    private LoadingCache<String, InputStream> streamCache;
+
+    private LoadingCache<String, Long> fileLengthCache;
+
+    /** The data store. */
+    private DataStore dataStore;
+
+    /**
+     * Gets the stream cache size.
+     * 
+     * @return the stream cache size
+     */
+    protected int getStreamCacheSize() {
+        return streamCacheSize;
+    }
+
+    /**
+     * Sets the stream cache size.
+     * 
+     * @param streamCacheSize
+     *            the new stream cache size
+     */
+    protected void setStreamCacheSize(int streamCacheSize) {
+        this.streamCacheSize = streamCacheSize;
+    }
+
+    /**
+     * Sets the block size.
+     * 
+     * @param x
+     *            the new block size
+     */
+    public final void setBlockSize(final int x) {
+        validateBlockSize(x);
+        this.blockSize = x;
+    }
+
+    /**
+     * Validate block size.
+     * 
+     * @param x
+     *            the x
+     */
+    private static void validateBlockSize(final int x) {
+        if (x < BLOCK_SIZE_LIMIT) {
+            throw new IllegalArgumentException("The minimum size must be bigger "
+                    + "than a content hash itself; limit = " + BLOCK_SIZE_LIMIT);
+        }
+    }
+
+    /**
+     * Initialized the blob store.
+     * 
+     * @param dataStore
+     *            the data store
+     * @param streamCacheSize
+     *            the stream cache size
+     */
+    public void init(DataStore dataStore) {
+        if (streamCacheSize <= 0) {
+            streamCacheSize = DEFAULT_STREAM_CACHE_SIZE;
+        }
+
+        streamCache = CacheBuilder.newBuilder().maximumSize(streamCacheSize)
+                .removalListener(new RemovalListener<String, InputStream>() {
+                    public void onRemoval(RemovalNotification<String, InputStream> removal) {
+                        InputStream stream = removal.getValue();
+                        IOUtils.closeQuietly(stream);
+                    }
+                }).build(new CacheLoader<String, InputStream>() {
+                    public InputStream load(String key) throws Exception {
+                        return loadStream(key);
+                    }
+                });
+        fileLengthCache = CacheBuilder.newBuilder().maximumSize(streamCacheSize)
+                .build(new CacheLoader<String, Long>() {
+                    @Override
+                    public Long load(String key) throws Exception {
+                        return getBlobLength(key);
+                    }
+                });
+        this.dataStore = dataStore;
+    }
+
+    /**
+     * Writes the input stream to the data store.
+     */
+    @Override
+    public String writeBlob(InputStream in) throws IOException {
+        try {
+            // add the record in the data store
+            DataRecord dataRec = dataStore.addRecord(in);
+            return dataRec.getIdentifier().toString();
+        } catch (DataStoreException e) {
+            throw new IOException(e);
+        } finally {
+            IOUtils.closeQuietly(in);
+        }
+    }
+
+    /**
+     * Reads the blob with the given blob id and range.
+     */
+    @Override
+    public int readBlob(String blobId, long pos, byte[] buff, int off, int length) throws IOException {
+        if (Strings.isNullOrEmpty(blobId)) {
+            return -1;
+        }
+
+        long blobLength;
+        try {
+            blobLength = fileLengthCache.get(blobId);
+        } catch (ExecutionException e) {
+            LOG.debug("File length cache error", e);
+            blobLength = getBlobLength(blobId);
+        }
+        LOG.debug("read {" + blobId + "}, {" + blobLength + "}");
+
+        long position = pos;
+        int offset = off;
+
+        if (position < blobLength) {
+            int totalLength = 0;
+            long bytesLeft = ((position + length) > blobLength ? blobLength - position : length);
+
+            // Reads all the logical blocks satisfying the required range
+            while (bytesLeft > 0) {
+                long posBlockStart = position / blockSize;
+                int posOffsetInBlock = (int) (position - posBlockStart * blockSize);
+
+                byte[] block = readBlock(blobId, posBlockStart);
+
+                long bytesToRead = Math.min(bytesLeft,
+                        Math.min((blobLength - posOffsetInBlock), (blockSize - posOffsetInBlock)));
+                System.arraycopy(block, posOffsetInBlock, buff, offset, (int) bytesToRead);
+
+                position += bytesToRead;
+                offset += bytesToRead;
+                totalLength += bytesToRead;
+                bytesLeft -= bytesToRead;
+            }
+            return totalLength;
+        } else {
+            LOG.trace("Blob read for pos " + pos + "," + (pos + length - 1) + " out of range");
+            return -1;
+        }
+    }
+
+    /**
+     * Gets the data store.
+     * 
+     * @return the data store
+     */
+    public DataStore getDataStore() {
+        return dataStore;
+    }
+
+    /**
+     * Sets the data store.
+     * 
+     * @param dataStore
+     *            the data store
+     */
+    protected void setDataStore(DataStore dataStore) {
+        this.dataStore = dataStore;
+    }
+
+    /**
+     * Load the block to the cache.
+     */
+    @Override
+    public final Data load(final LogicalBlockId id) {
+        byte[] data;
+        try {
+            data = readBlockFromBackend(id);
+        } catch (Exception e) {
+            throw new RuntimeException("failed to read block from backend, id " + id, e);
+        }
+        if (data == null) {
+            throw new IllegalArgumentException("The block with id " + id + " was not found");
+        }
+        LOG.debug("Read from backend (Cache Miss): " + id);
+        return new Data(data);
+    }
+
+    /**
+     * Gets the length of the blob identified by the blobId.
+     */
+    @Override
+    public final long getBlobLength(final String blobId) throws IOException {
+        if (Strings.isNullOrEmpty(blobId)) {
+            return 0;
+        }
+
+        Long length = null;
+        try {
+            if (dataStore instanceof CachingDataStore) {
+                length = ((CachingDataStore) dataStore).getLength(new DataIdentifier(blobId));
+            } else {
+                length = dataStore.getRecord(new DataIdentifier(blobId)).getLength();
+            }
+            return length;
+        } catch (DataStoreException e) {
+            throw new IOException("Could not get length of blob for id " + blobId, e);
+        }
+    }
+
+    /**
+     * Reads block from backend.
+     * 
+     * @param id
+     *            the id
+     * @return the byte[]
+     * @throws IOException
+     *             Signals that an I/O exception has occurred.
+     */
+    private byte[] readBlockFromBackend(final LogicalBlockId id) throws IOException {
+        String key = StringUtils.convertBytesToHex(id.digest);
+        InputStream stream = null;
+        try {
+            stream = streamCache.get(key);
+        } catch (ExecutionException e) {
+            LOG.debug("Error retrieving from stream cache : " + key, e);
+        }
+
+        byte[] block = new byte[blockSize];
+        org.apache.commons.io.IOUtils.read(stream, block, 0, blockSize);
+
+        if ((stream != null) && (stream.available() <= 0)) {
+            streamCache.invalidate(key);
+        }
+        return block;
+    }
+
+    /**
+     * Loads the stream from the data store.
+     * 
+     * @param key
+     *            the key
+     * @return the input stream
+     * @throws IOException
+     *             Signals that an I/O exception has occurred.
+     */
+    private InputStream loadStream(String key) throws IOException {
+        InputStream stream = null;
+        try {
+            stream = dataStore.getRecord(new DataIdentifier(key)).getStream();
+        } catch (DataStoreException e) {
+            throw new IOException("Could not read blob for id " + key, e);
+        }
+        return stream;
+    }
+
+    /**
+     * Reads block.
+     * 
+     * @param blobId
+     *            the blob id
+     * @param posStart
+     *            the pos start
+     * @return the byte[]
+     * @throws Exception
+     *             the exception
+     */
+    private byte[] readBlock(final String blobId, final long posStart) throws IOException {
+        byte[] digest = StringUtils.convertHexToBytes(blobId);
+        LogicalBlockId id = new LogicalBlockId(digest, posStart);
+
+        LOG.debug("Trying to read from cache : " + blobId + ", " + posStart);
+
+        return blockCache.get(id).data;
+    }
+
+    /**
+     * Delete all blobs older than.
+     * 
+     * @param time
+     *            the time
+     * @return the int
+     * @throws Exception
+     *             the exception
+     */
+    public int deleteAllOlderThan(long time) throws Exception {
+        return dataStore.deleteAllOlderThan(time);
+    }
+
+    /**
+     * A file is divided into logical chunks. Blocks are small enough to fit in
+     * memory, so they can be cached.
+     */
+    public static class LogicalBlockId {
+
+        /** The digest. */
+        final byte[] digest;
+
+        /** The starting pos. */
+        final long pos;
+
+        /**
+         * Instantiates a new logical block id.
+         * 
+         * @param digest
+         *            the digest
+         * @param pos
+         *            the starting position of the block
+         */
+        LogicalBlockId(final byte[] digest, final long pos) {
+            this.digest = digest;
+            this.pos = pos;
+        }
+
+        @Override
+        public final boolean equals(final Object other) {
+            if (this == other) {
+                return true;
+            }
+            if (other == null || !(other instanceof LogicalBlockId)) {
+                return false;
+            }
+            LogicalBlockId o = (LogicalBlockId) other;
+            return Arrays.equals(digest, o.digest) && pos == o.pos;
+        }
+
+        @Override
+        public final int hashCode() {
+            return Arrays.hashCode(digest) ^ (int) (pos >> 32) ^ (int) pos;
+        }
+
+        @Override
+        public final String toString() {
+            return StringUtils.convertBytesToHex(digest) + "@" + pos;
+        }
+
+        /**
+         * Gets the digest.
+         * 
+         * @return the digest
+         */
+        public final byte[] getDigest() {
+            return digest;
+        }
+
+        /**
+         * Gets the starting position.
+         * 
+         * @return the starting position
+         */
+        public final long getPos() {
+            return pos;
+        }
+    }
+
+    /**
+     * The data for a block.
+     */
+    public static class Data implements Cache.Value {
+
+        /** The data. */
+        final byte[] data;
+
+        /**
+         * Instantiates a new data.
+         * 
+         * @param data
+         *            the data
+         */
+        Data(final byte[] data) {
+            this.data = data;
+        }
+
+        @Override
+        public final String toString() {
+            String s = StringUtils.convertBytesToHex(data);
+            return s.length() > 100 ? s.substring(0, 100) + ".. (len=" + data.length + ")" : s;
+        }
+
+        @Override
+        public final int getMemory() {
+            return data.length;
+        }
+    }
+
+    @Override
+    public String writeBlob(String tempFileName) throws IOException {
+        File file = new File(tempFileName);
+        InputStream in = null;
+        try {
+            in = new FileInputStream(file);
+            return writeBlob(in);
+        } finally {
+            if (in != null) {
+                in.close();
+            }
+            file.delete();
+        }
+    }
+
+    @Override
+    public int sweep() throws IOException {
+        // no-op
+        return 0;
+    }
+
+    @Override
+    public void startMark() throws IOException {
+    }
+
+    @Override
+    public void clearInUse() {
+        dataStore.clearInUse();
+    }
+
+    @Override
+    public void clearCache() {
+        // no-op
+    }
+
+    @Override
+    public long getBlockSizeMin() {
+        // no-op
+        return 0;
+    }
+
+    /**
+     * Ignores the maxLastModifiedTime currently.
+     */
+    @Override
+    public Iterator<String> getAllChunkIds(
+            long maxLastModifiedTime) throws Exception {
+        return new DataStoreIterator(dataStore.getAllIdentifiers());
+    }
+
+    @Override
+    public boolean deleteChunk(String blobId) throws Exception {
+        ((MultiDataStoreAware) dataStore).deleteRecord(new DataIdentifier(blobId));
+        return true;
+    }
+
+    @Override
+    public Iterator<String> resolveChunks(String blobId) throws IOException {
+        return Lists.newArrayList(blobId).iterator();
+    }
+
+    class DataStoreIterator implements Iterator<String> {
+        Iterator<DataIdentifier> backingIterator;
+
+        public DataStoreIterator(Iterator<DataIdentifier> backingIterator) {
+            this.backingIterator = backingIterator;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return backingIterator.hasNext();
+        }
+
+        @Override
+        public String next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException("No more elements");
+            }
+            return backingIterator.next().toString();
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStoreBuilder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStoreBuilder.java?rev=1568240&r1=1568239&r2=1568240&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStoreBuilder.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStoreBuilder.java Fri Feb 14 11:21:31 2014
@@ -1,138 +1,138 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.jackrabbit.oak.plugins.blob.datastore;
-
-import javax.jcr.RepositoryException;
-
-import org.apache.commons.beanutils.BeanUtils;
-import org.apache.jackrabbit.core.data.Backend;
-import org.apache.jackrabbit.core.data.CachingDataStore;
-import org.apache.jackrabbit.core.data.DataStore;
-import org.apache.jackrabbit.core.data.FileDataStore;
-import org.apache.jackrabbit.core.data.MultiDataStore;
-import org.apache.jackrabbit.core.data.db.DbDataStore;
-import org.apache.jackrabbit.core.util.db.ConnectionFactory;
-import org.apache.jackrabbit.mk.blobs.BlobStore;
-import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBuilder;
-import org.apache.jackrabbit.oak.plugins.blob.BlobStoreConfiguration;
-
-import com.google.common.base.Optional;
-
-/**
- * Helper class to create {@link DataStoreBlobStore} instance and inject the
- * appropriate Jackrabbit {@link DataStore} instance based on the configuration.
- */
-public class DataStoreBlobStoreBuilder implements BlobStoreBuilder {
-
-    private static final DataStoreBlobStoreBuilder INSTANCE = new DataStoreBlobStoreBuilder();
-
-    public static DataStoreBlobStoreBuilder newInstance() {
-        return INSTANCE;
-    }
-
-    /**
-     * Creates the wrapper {@link BlobStore} instance for Jackrabbit
-     * {@link DataStore}.
-     * 
-     * @param configuration
-     *            the configuration
-     * @return the dS blob store wrapped as{@link Optional} indicating that the
-     *         value can be null when a valid configuration is not available
-     * @throws Exception
-     *             the exception
-     */
-    @Override
-    public Optional<BlobStore> build(BlobStoreConfiguration configuration) throws Exception {
-        BlobStore blobStore = null;
-
-        DataStore store = getDataStore(configuration);
-        if (store != null) {
-            blobStore = new DataStoreBlobStore();
-            BeanUtils.populate(blobStore, configuration.getConfigMap());
-            ((DataStoreBlobStore) blobStore).init(store);
-        }
-        return Optional.fromNullable(blobStore);
-    }
-
-    /**
-     * Gets the data store based on the DataStoreProvider.
-     * 
-     * @param dataStoreConfig
-     *            the data store config
-     * @param dataStoreType
-     *            the data store type
-     * @return the data store
-     * @throws RepositoryException
-     *             the repository exception
-     */
-    private DataStore getDataStore(BlobStoreConfiguration config) throws Exception {
-        return getDataStore(
-                (String) config.getProperty(BlobStoreConfiguration.PROP_DATA_STORE), config);
-    }
-
-    private DataStore getDataStore(String dataStoreType, BlobStoreConfiguration config) throws Exception {
-        DataStore dataStore = (DataStore) Class.forName(dataStoreType).newInstance();
-        BeanUtils.populate(dataStore, config.getConfigMap());
-
-        if (dataStore instanceof DbDataStore) {
-            ((DbDataStore) dataStore)
-                    .setConnectionFactory(new ConnectionFactory());
-        }
-
-        if (dataStore instanceof MultiDataStore) {
-            DataStore primary =
-                    getDataStore(
-                            (String) config.getProperty(BlobStoreConfiguration.PRIMARY_DATA_STORE), config);
-            DataStore archive =
-                    getDataStore(
-                            (String) config.getProperty(BlobStoreConfiguration.ARCHIVE_DATA_STORE), config);
-            ((MultiDataStore) dataStore)
-                    .setPrimaryDataStore(primary);
-            ((MultiDataStore) dataStore)
-                    .setArchiveDataStore(archive);
-            dataStore.init(null);
-        } else if (!(dataStore instanceof FileDataStore)
-                && !(dataStore instanceof CachingDataStore)) {
-            dataStore.init(null);
-            return wrapInCachingDataStore(dataStore, config);
-        }
-        else {
-            dataStore.init(null);
-        }
-
-        return dataStore;
-    }
-
-    private DataStore wrapInCachingDataStore(final DataStore dataStore, BlobStoreConfiguration config) throws Exception {
-        CachingDataStore cachingStore = new CachingDataStore() {
-            @Override
-            protected Backend createBackend() {
-                return new DataStoreWrapperBackend(dataStore);
-            }
-
-            @Override
-            protected String getMarkerFile() {
-                return "db.init.done";
-            }
-        };
-
-        BeanUtils.populate(cachingStore, config.getConfigMap());
-        cachingStore.init(null);
-
-        return cachingStore;
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob.datastore;
+
+import javax.jcr.RepositoryException;
+
+import org.apache.commons.beanutils.BeanUtils;
+import org.apache.jackrabbit.core.data.Backend;
+import org.apache.jackrabbit.core.data.CachingDataStore;
+import org.apache.jackrabbit.core.data.DataStore;
+import org.apache.jackrabbit.core.data.FileDataStore;
+import org.apache.jackrabbit.core.data.MultiDataStore;
+import org.apache.jackrabbit.core.data.db.DbDataStore;
+import org.apache.jackrabbit.core.util.db.ConnectionFactory;
+import org.apache.jackrabbit.mk.blobs.BlobStore;
+import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBuilder;
+import org.apache.jackrabbit.oak.plugins.blob.BlobStoreConfiguration;
+
+import com.google.common.base.Optional;
+
+/**
+ * Helper class to create {@link DataStoreBlobStore} instance and inject the
+ * appropriate Jackrabbit {@link DataStore} instance based on the configuration.
+ */
+public class DataStoreBlobStoreBuilder implements BlobStoreBuilder {
+
+    private static final DataStoreBlobStoreBuilder INSTANCE = new DataStoreBlobStoreBuilder();
+
+    public static DataStoreBlobStoreBuilder newInstance() {
+        return INSTANCE;
+    }
+
+    /**
+     * Creates the wrapper {@link BlobStore} instance for Jackrabbit
+     * {@link DataStore}.
+     * 
+     * @param configuration
+     *            the configuration
+     * @return the dS blob store wrapped as{@link Optional} indicating that the
+     *         value can be null when a valid configuration is not available
+     * @throws Exception
+     *             the exception
+     */
+    @Override
+    public Optional<BlobStore> build(BlobStoreConfiguration configuration) throws Exception {
+        BlobStore blobStore = null;
+
+        DataStore store = getDataStore(configuration);
+        if (store != null) {
+            blobStore = new DataStoreBlobStore();
+            BeanUtils.populate(blobStore, configuration.getConfigMap());
+            ((DataStoreBlobStore) blobStore).init(store);
+        }
+        return Optional.fromNullable(blobStore);
+    }
+
+    /**
+     * Gets the data store based on the DataStoreProvider.
+     * 
+     * @param dataStoreConfig
+     *            the data store config
+     * @param dataStoreType
+     *            the data store type
+     * @return the data store
+     * @throws RepositoryException
+     *             the repository exception
+     */
+    private DataStore getDataStore(BlobStoreConfiguration config) throws Exception {
+        return getDataStore(
+                (String) config.getProperty(BlobStoreConfiguration.PROP_DATA_STORE), config);
+    }
+
+    private DataStore getDataStore(String dataStoreType, BlobStoreConfiguration config) throws Exception {
+        DataStore dataStore = (DataStore) Class.forName(dataStoreType).newInstance();
+        BeanUtils.populate(dataStore, config.getConfigMap());
+
+        if (dataStore instanceof DbDataStore) {
+            ((DbDataStore) dataStore)
+                    .setConnectionFactory(new ConnectionFactory());
+        }
+
+        if (dataStore instanceof MultiDataStore) {
+            DataStore primary =
+                    getDataStore(
+                            (String) config.getProperty(BlobStoreConfiguration.PRIMARY_DATA_STORE), config);
+            DataStore archive =
+                    getDataStore(
+                            (String) config.getProperty(BlobStoreConfiguration.ARCHIVE_DATA_STORE), config);
+            ((MultiDataStore) dataStore)
+                    .setPrimaryDataStore(primary);
+            ((MultiDataStore) dataStore)
+                    .setArchiveDataStore(archive);
+            dataStore.init(null);
+        } else if (!(dataStore instanceof FileDataStore)
+                && !(dataStore instanceof CachingDataStore)) {
+            dataStore.init(null);
+            return wrapInCachingDataStore(dataStore, config);
+        }
+        else {
+            dataStore.init(null);
+        }
+
+        return dataStore;
+    }
+
+    private DataStore wrapInCachingDataStore(final DataStore dataStore, BlobStoreConfiguration config) throws Exception {
+        CachingDataStore cachingStore = new CachingDataStore() {
+            @Override
+            protected Backend createBackend() {
+                return new DataStoreWrapperBackend(dataStore);
+            }
+
+            @Override
+            protected String getMarkerFile() {
+                return "db.init.done";
+            }
+        };
+
+        BeanUtils.populate(cachingStore, config.getConfigMap());
+        cachingStore.init(null);
+
+        return cachingStore;
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStoreBuilder.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message