jackrabbit-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mreut...@apache.org
Subject svn commit: r745415 - in /jackrabbit/sandbox/jackrabbit-hadoop: ./ src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/jackrabbit/ src/main/java/org/apache/jackrabbit/hadoop/
Date Wed, 18 Feb 2009 08:10:23 GMT
Author: mreutegg
Date: Wed Feb 18 08:10:23 2009
New Revision: 745415

URL: http://svn.apache.org/viewvc?rev=745415&view=rev
Log:
PersistenceManager and DataStore on Hadoop HBase and HDFS

Added:
    jackrabbit/sandbox/jackrabbit-hadoop/   (with props)
    jackrabbit/sandbox/jackrabbit-hadoop/README.txt   (with props)
    jackrabbit/sandbox/jackrabbit-hadoop/pom.xml   (with props)
    jackrabbit/sandbox/jackrabbit-hadoop/src/
    jackrabbit/sandbox/jackrabbit-hadoop/src/main/
    jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/
    jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/
    jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/
    jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/
    jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/
    jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/HBasePersistenceManager.java
  (with props)
    jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/HadoopDataRecord.java
  (with props)
    jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/HadoopDataStore.java
  (with props)

Propchange: jackrabbit/sandbox/jackrabbit-hadoop/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Feb 18 08:10:23 2009
@@ -0,0 +1,5 @@
+*.iml
+*.ipr
+*.iws
+*.log
+target

Added: jackrabbit/sandbox/jackrabbit-hadoop/README.txt
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/jackrabbit-hadoop/README.txt?rev=745415&view=auto
==============================================================================
--- jackrabbit/sandbox/jackrabbit-hadoop/README.txt (added)
+++ jackrabbit/sandbox/jackrabbit-hadoop/README.txt Wed Feb 18 08:10:23 2009
@@ -0,0 +1,11 @@
+This module provides an implementation of a PersistenceManager
+on top of Hadoop HBase and a DataStore on Hadoop HDFS.
+
+Building this module requires JAVA 6 and you need to manually
+deploy the two dependencies hbase-0.19.0.jar and hadoop-core-0.19.0.jar.
+Simply run 'mvn install' and you will get instructions on how
+this can be done.
+
+The default configuration for both the PersistenceManager as
+well as the DataStore assume that Hadoop is running on
+hdfs://localhost:9000

Propchange: jackrabbit/sandbox/jackrabbit-hadoop/README.txt
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/sandbox/jackrabbit-hadoop/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/jackrabbit-hadoop/pom.xml?rev=745415&view=auto
==============================================================================
--- jackrabbit/sandbox/jackrabbit-hadoop/pom.xml (added)
+++ jackrabbit/sandbox/jackrabbit-hadoop/pom.xml Wed Feb 18 08:10:23 2009
@@ -0,0 +1,94 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+                             http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+<!-- ====================================================================== -->
+<!-- P R O J E C T  D E S C R I P T I O N                                   -->
+<!-- ====================================================================== -->
+  <groupId>org.apache.jackrabbit</groupId>
+  <artifactId>jackrabbit-hadoop</artifactId>
+  <version>1.6-SNAPSHOT</version>
+  <name>Jackrabbit on Hadoop and HBase</name>
+  <description>Jackrabbit PersistenceManager on HBase and DataStore on Hadoop</description>
+
+  <dependencies>
+    <dependency>
+      <groupId>javax.jcr</groupId>
+      <artifactId>jcr</artifactId>
+      <version>1.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.jackrabbit</groupId>
+      <artifactId>jackrabbit-core</artifactId>
+      <version>1.6-SNAPSHOT</version>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <version>1.5.3</version>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>jcl-over-slf4j</artifactId>
+      <version>1.5.3</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hbase</artifactId>
+      <version>0.19.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-core</artifactId>
+      <version>0.19.0</version>
+    </dependency>
+  </dependencies>
+  
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <target>1.6</target>
+          <source>1.6</source>
+        </configuration>
+      </plugin>
+      <plugin>
+        <artifactId>maven-idea-plugin</artifactId>
+        <version>2.0</version>
+        <configuration>
+          <downloadSources>true</downloadSources>
+          <jdkLevel>1.6</jdkLevel>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>

Propchange: jackrabbit/sandbox/jackrabbit-hadoop/pom.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/HBasePersistenceManager.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/HBasePersistenceManager.java?rev=745415&view=auto
==============================================================================
--- jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/HBasePersistenceManager.java
(added)
+++ jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/HBasePersistenceManager.java
Wed Feb 18 08:10:23 2009
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.hadoop;
+
+import java.io.IOException;
+import java.io.DataInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.DataOutputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.NoSuchElementException;
+import java.util.Iterator;
+
+import javax.jcr.RepositoryException;
+
+import org.apache.jackrabbit.core.persistence.bundle.AbstractBundlePersistenceManager;
+import org.apache.jackrabbit.core.persistence.bundle.util.NodePropBundle;
+import org.apache.jackrabbit.core.persistence.bundle.util.BundleBinding;
+import org.apache.jackrabbit.core.persistence.bundle.util.ErrorHandling;
+import org.apache.jackrabbit.core.persistence.PMContext;
+import org.apache.jackrabbit.core.persistence.util.Serializer;
+import org.apache.jackrabbit.core.NodeId;
+import org.apache.jackrabbit.core.NodeIdIterator;
+import org.apache.jackrabbit.core.state.ItemStateException;
+import org.apache.jackrabbit.core.state.NodeReferences;
+import org.apache.jackrabbit.core.state.NodeReferencesId;
+import org.apache.jackrabbit.core.state.NoSuchItemStateException;
+import org.apache.jackrabbit.core.state.ChangeLog;
+import org.apache.jackrabbit.uuid.UUID;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Scanner;
+import org.apache.hadoop.hbase.client.transactional.TransactionManager;
+import org.apache.hadoop.hbase.client.transactional.TransactionalTable;
+import org.apache.hadoop.hbase.client.transactional.TransactionState;
+
+/**
+ * <code>HBasePersistenceManager</code>...
+ */
+public class HBasePersistenceManager extends AbstractBundlePersistenceManager {
+
+    private String tablePrefix = null;
+
+    private static final byte[] DATA_COLUMN = Bytes.toBytes("data:");
+
+    private HBaseConfiguration config = new HBaseConfiguration();
+
+    private TransactionalTable bundles;
+
+    private TransactionalTable noderefs;
+
+    private BundleBinding binding;
+
+    private TransactionManager txMgr;
+
+    private TransactionState txState;
+
+    public void init(PMContext context) throws Exception {
+        super.init(context);
+        // always use transaction interface
+        config.set("hbase.regionserver.class", "org.apache.hadoop.hbase.ipc.TransactionalRegionInterface");
+        config.set("hbase.regionserver.impl", "org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegionServer");
+
+        // table prefix
+        if (tablePrefix == null) {
+            // use name of home directory, this is usually the workspace name
+            tablePrefix = context.getHomeDir().getName() + "_";
+        }
+        // set table names
+        byte[] bundlesTable = Bytes.toBytes(tablePrefix + "bundles");
+        byte[] nodeRefsTable = Bytes.toBytes(tablePrefix + "noderefs");
+
+        HBaseAdmin admin = new HBaseAdmin(config);
+        if (!admin.tableExists(bundlesTable)) {
+            HTableDescriptor bundle = new HTableDescriptor(bundlesTable);
+            bundle.addFamily(new HColumnDescriptor(DATA_COLUMN));
+            admin.createTable(bundle);
+            admin.enableTable(bundlesTable);
+        }
+        if (!admin.tableExists(nodeRefsTable)) {
+            HTableDescriptor noderefs = new HTableDescriptor(nodeRefsTable);
+            noderefs.addFamily(new HColumnDescriptor(DATA_COLUMN));
+            admin.createTable(noderefs);
+            admin.enableTable(nodeRefsTable);
+        }
+        txMgr = new TransactionManager(config);
+
+        bundles = new TransactionalTable(config, bundlesTable);
+        noderefs = new TransactionalTable(config, nodeRefsTable);
+
+        binding = new BundleBinding(new ErrorHandling(ErrorHandling.IGNORE_MISSING_BLOBS),
+                null, getNsIndex(), getNameIndex(), context.getDataStore());
+        binding.setMinBlobSize(Long.MAX_VALUE);
+    }
+
+    public synchronized void store(ChangeLog changeLog)
+            throws ItemStateException {
+        txState = txMgr.beginTransaction();
+        try {
+            boolean success = false;
+            try {
+                super.store(changeLog);
+                txMgr.tryCommit(txState);
+                success = true;
+            } catch (Exception e) {
+                throw new ItemStateException(e.getMessage(), e);
+            } finally {
+                if (!success) {
+                    try {
+                        txMgr.abort(txState);
+                    } catch (IOException e) {
+                        throw new ItemStateException(e.getMessage(), e);
+                    }
+                }
+            }
+        } finally {
+            txState = null;
+        }
+    }
+
+    protected NodePropBundle loadBundle(NodeId id) throws ItemStateException {
+        try {
+            Cell cell = bundles.get(id.getUUID().getRawBytes(), DATA_COLUMN);
+            if (cell == null) {
+                return null;
+            }
+            return getBinding().readBundle(new DataInputStream(
+                    new ByteArrayInputStream(cell.getValue())), id);
+        } catch (IOException e) {
+            throw new ItemStateException(e.getMessage(), e);
+        }
+    }
+
+    protected boolean existsBundle(NodeId id) throws ItemStateException {
+        try {
+            return bundles.exists(id.getUUID().getRawBytes());
+        } catch (IOException e) {
+            throw new ItemStateException(e.getMessage(), e);
+        }
+    }
+
+    protected void storeBundle(NodePropBundle bundle)
+            throws ItemStateException {
+        try {
+            ByteArrayOutputStream data = new ByteArrayOutputStream((int) bundle.getSize());
+            DataOutputStream dos = new DataOutputStream(data);
+            binding.writeBundle(dos, bundle);
+            dos.flush();
+            BatchUpdate update = new BatchUpdate(bundle.getId().getUUID().getRawBytes());
+            update.put(DATA_COLUMN, data.toByteArray());
+            bundles.commit(txState, update);
+        } catch (IOException e) {
+            throw new ItemStateException(e.getMessage(), e);
+        }
+    }
+
+    protected void destroyBundle(NodePropBundle bundle)
+            throws ItemStateException {
+        try {
+            bundles.deleteAll(txState, bundle.getId().getUUID().getRawBytes(), HConstants.LATEST_TIMESTAMP);
+        } catch (IOException e) {
+            throw new ItemStateException(e.getMessage(), e);
+        }
+    }
+
+    public NodeReferences load(NodeReferencesId id)
+            throws NoSuchItemStateException, ItemStateException {
+        try {
+            Cell cell = noderefs.get(id.getTargetId().getUUID().getRawBytes(), DATA_COLUMN);
+            if (cell != null) {
+                NodeReferences refs = new NodeReferences(id);
+                Serializer.deserialize(refs, new ByteArrayInputStream(cell.getValue()));
+                return refs;
+            }
+        } catch (Exception e) {
+            throw new ItemStateException(e.getMessage(), e);
+        }
+        throw new NoSuchItemStateException(id.getTargetId().toString());
+    }
+
+    protected void destroy(NodeReferences refs) throws ItemStateException {
+        try {
+            noderefs.deleteAll(txState, refs.getTargetId().getUUID().getRawBytes(), HConstants.LATEST_TIMESTAMP);
+        } catch (IOException e) {
+            throw new ItemStateException(e.getMessage(), e);
+        }
+    }
+
+    protected void store(NodeReferences refs) throws ItemStateException {
+        try {
+            ByteArrayOutputStream out = new ByteArrayOutputStream();
+            Serializer.serialize(refs, out);
+            BatchUpdate update = new BatchUpdate(refs.getTargetId().getUUID().getRawBytes());
+            update.put(DATA_COLUMN, out.toByteArray());
+            noderefs.commit(txState, update);
+        } catch (Exception e) {
+            throw new ItemStateException(e.getMessage(), e);
+        }
+
+    }
+
+    protected BundleBinding getBinding() {
+        return binding;
+    }
+
+    public void close() throws Exception {
+        try {
+            bundles.close();
+        } catch (IOException e) {
+            // TODO: log
+        }
+        try {
+            noderefs.close();
+        } catch (IOException e) {
+            // TODO: log
+        }
+    }
+
+    public boolean exists(NodeReferencesId id) throws ItemStateException {
+        try {
+            return noderefs.exists(id.getTargetId().getUUID().getRawBytes());
+        } catch (IOException e) {
+            throw new ItemStateException(e.getMessage(), e);
+        }
+    }
+
+    public NodeIdIterator getAllNodeIds(NodeId after, int maxCount)
+            throws ItemStateException, RepositoryException {
+        // TODO: introduce dummy column to avoid loading of data column?
+        try {
+            final List<NodeId> ids = new ArrayList<NodeId>();
+            Scanner scanner;
+            if (after != null) {
+                scanner = bundles.getScanner(new byte[][]{DATA_COLUMN}, after.getUUID().getRawBytes());
+            } else {
+                scanner = bundles.getScanner(new byte[][]{DATA_COLUMN});
+            }
+            for (RowResult row : scanner) {
+                NodeId id = new NodeId(new UUID(row.getRow()));
+                if (ids.isEmpty() && id.equals(after)) {
+                    // exclude after
+                    continue;
+                }
+                if (maxCount != 0 && ids.size() > maxCount) {
+                    break;
+                }
+                ids.add(id);
+            }
+            return new NodeIdIterator() {
+
+                private Iterator<NodeId> it = ids.iterator();
+
+                public NodeId nextNodeId() throws NoSuchElementException {
+                    return it.next();
+                }
+
+                public boolean hasNext() {
+                    return it.hasNext();
+                }
+
+                public Object next() {
+                    return nextNodeId();
+                }
+
+                public void remove() {
+                    throw new UnsupportedOperationException();
+                }
+            };
+        } catch (IOException e) {
+            throw new ItemStateException(e.getMessage(), e);
+        }
+    }
+
+    //---------------------------< parameters >---------------------------------
+
+    public String getRootdir() {
+        return config.get("hbase.rootdir");
+    }
+
+    public void setRootdir(String rootdir) {
+        config.set("hbase.rootdir", rootdir);
+    }
+
+    public String getTablePrefix() {
+        return tablePrefix;
+    }
+
+    public void setTablePrefix(String prefix) {
+        this.tablePrefix = prefix;
+    }
+}

Propchange: jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/HBasePersistenceManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/HadoopDataRecord.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/HadoopDataRecord.java?rev=745415&view=auto
==============================================================================
--- jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/HadoopDataRecord.java
(added)
+++ jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/HadoopDataRecord.java
Wed Feb 18 08:10:23 2009
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.hadoop;
+
+import java.io.InputStream;
+import java.io.IOException;
+
+import org.apache.jackrabbit.core.data.AbstractDataRecord;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+
+/**
+ * <code>HadoopDataRecord</code>...
+ */
+public class HadoopDataRecord extends AbstractDataRecord {
+
+    private final Path path;
+
+    private final FileSystem fs;
+
+    public HadoopDataRecord(DataIdentifier identifier, FileSystem fs, Path path) {
+        super(identifier);
+        this.fs = fs;
+        this.path = path;
+    }
+
+    public long getLength() throws DataStoreException {
+        try {
+            return fs.getFileStatus(path).getLen();
+        } catch (IOException e) {
+            throw new DataStoreException(e);
+        }
+    }
+
+    public InputStream getStream() throws DataStoreException {
+        try {
+            return fs.open(path);
+        } catch (IOException e) {
+            throw new DataStoreException(e);
+        }
+    }
+
+    public long getLastModified() {
+        try {
+            return fs.getFileStatus(path).getModificationTime();
+        } catch (IOException e) {
+            return 0;
+        }
+    }
+}

Propchange: jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/HadoopDataRecord.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/HadoopDataStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/HadoopDataStore.java?rev=745415&view=auto
==============================================================================
--- jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/HadoopDataStore.java
(added)
+++ jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/HadoopDataStore.java
Wed Feb 18 08:10:23 2009
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.hadoop;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Collections;
+import java.util.WeakHashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Random;
+import java.security.MessageDigest;
+import java.security.DigestOutputStream;
+import java.security.NoSuchAlgorithmException;
+
+import javax.jcr.RepositoryException;
+
+import org.apache.jackrabbit.core.data.DataStore;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.commons.io.IOUtils;
+
+/**
+ * <code>HadoopDataStore</code>...
+ */
+public class HadoopDataStore implements DataStore {
+
+    /**
+     * The digest algorithm used to uniquely identify records.
+     */
+    private static final String DIGEST = "SHA-1";
+
+    /**
+     * The default value for the minimum object size.
+     */
+    private static final int DEFAULT_MIN_RECORD_LENGTH = 100;
+
+    /**
+     * Name of the directory used for temporary files.
+     * Must be at least 3 characters.
+     */
+    private static final String TMP = "tmp";
+
+    /**
+     * The minimum modified date. If a file is accessed (read or write) with a modified date
+     * older than this value, the modified date is updated to the current time.
+     */
+    private long minModifiedDate;
+
+    /**
+     * The hadoop file system configuration.
+     */
+    private Configuration config = new Configuration();
+
+    /**
+     * The directory that contains all the data record files. The structure
+     * of content within this directory is controlled by this class.
+     */
+    private FileSystem directory;
+
+    /**
+     * The name of the directory that contains all the data record files. The structure
+     * of content within this directory is controlled by this class.
+     */
+    private Path path;
+
+    /**
+     * The minimum size of an object that should be stored in this data store.
+     */
+    private int minRecordLength = DEFAULT_MIN_RECORD_LENGTH;
+
+    /**
+     * A random that will be used to generate temp file names.
+     */
+    private final Random random = new Random();
+
+    /**
+     * All data identifiers that are currently in use are in this set until they are garbage
collected.
+     */
+    protected Map<DataIdentifier, Object> inUse = Collections.synchronizedMap(new WeakHashMap<DataIdentifier,
Object>());
+
+    /**
+     * Creates a uninitialized data store.
+     *
+     */
+    public HadoopDataStore() {
+        setFs_default_name("hdfs://localhost:9000");
+    }
+
+    /**
+     * Initialized the data store.
+     * If the path is not set, <code>datastore</code> is used.
+     * This directory is automatically created if it does not yet exist.
+     *
+     * @param homeDir the home directory
+     */
+    public void init(String homeDir) throws RepositoryException {
+        try {
+            directory = FileSystem.get(config);
+            directory.setWorkingDirectory(new Path("/"));
+            if (path == null) {
+                path = new Path("datastore");
+            }
+            directory.mkdirs(path);
+        } catch (IOException e) {
+            throw new RepositoryException(e);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public DataRecord getRecordIfStored(DataIdentifier identifier)
+            throws DataStoreException {
+        Path p = getPath(identifier);
+        try {
+            synchronized (this) {
+                if (!directory.exists(p)) {
+                    return null;
+                }
+                if (minModifiedDate != 0) {
+                    FileStatus status = directory.getFileStatus(p);
+                    if (status.getModificationTime() < minModifiedDate) {
+                        directory.setTimes(p, System.currentTimeMillis(), -1);
+                    }
+                }
+                usesIdentifier(identifier);
+                return new HadoopDataRecord(identifier, directory, p);
+            }
+        } catch (IOException e) {
+            throw new DataStoreException(e);
+        }
+    }
+
+    /**
+     * Returns the record with the given identifier. Note that this method
+     * performs no sanity checks on the given identifier. It is up to the
+     * caller to ensure that only identifiers of previously created data
+     * records are used.
+     *
+     * @param identifier data identifier
+     * @return identified data record
+     */
+    public DataRecord getRecord(DataIdentifier identifier)
+            throws DataStoreException {
+        DataRecord record = getRecordIfStored(identifier);
+        if (record == null) {
+            throw new DataStoreException("Record not found: " + identifier);
+        }
+        return record;
+    }
+
+    private void usesIdentifier(DataIdentifier identifier) {
+        inUse.put(identifier, null);
+    }
+
+    /**
+     * Creates a new data record.
+     * The stream is first consumed and the contents are saved in a temporary file
+     * and the SHA-1 message digest of the stream is calculated. If a
+     * record with the same SHA-1 digest (and length) is found then it is
+     * returned. Otherwise the temporary file is moved in place to become
+     * the new data record that gets returned.
+     *
+     * @param input binary stream
+     * @return data record that contains the given stream
+     * @throws DataStoreException if the record could not be created
+     */
+    public DataRecord addRecord(InputStream input) throws DataStoreException {
+        Path temporary = null;
+        try {
+            temporary = generateTempPath();
+            DataIdentifier tempId = new DataIdentifier(temporary.getName());
+            usesIdentifier(tempId);
+            // Copy the stream to the temporary file and calculate the
+            // stream length and the message digest of the stream
+            long length = 0;
+            MessageDigest digest = MessageDigest.getInstance(DIGEST);
+            OutputStream output = new DigestOutputStream(
+                    directory.append(temporary), digest);
+            try {
+                length = IOUtils.copyLarge(input, output);
+            } finally {
+                output.close();
+            }
+            DataIdentifier identifier = new DataIdentifier(digest.digest());
+            Path file;
+
+            synchronized (this) {
+                // Check if the same record already exists, or
+                // move the temporary file in place if needed
+                usesIdentifier(identifier);
+                file = getPath(identifier);
+                Path parent = file.getParent();
+                directory.mkdirs(parent);
+                if (!directory.exists(file)) {
+                    if (!directory.rename(temporary, file)) {
+                        throw new IOException(
+                                "Can not rename " + temporary.toString()
+                                + " to " + file.toString()
+                                + " (media read only?)");
+                    }
+                } else {
+                    long now = System.currentTimeMillis();
+                    if (directory.getFileStatus(file).getModificationTime() < now) {
+                        directory.setTimes(file, now, -1);
+                    }
+                }
+                // Sanity checks on the record file. These should never fail,
+                // but better safe than sorry...
+                FileStatus status = directory.getFileStatus(file);
+                if (status.isDir()) {
+                    throw new IOException("Not a file: " + file);
+                }
+                if (status.getLen() != length) {
+                    throw new IOException(DIGEST + " collision: " + file);
+                }
+            }
+            // this will also make sure that
+            // tempId is not garbage collected until here
+            inUse.remove(tempId);
+            return new HadoopDataRecord(identifier, directory, file);
+        } catch (NoSuchAlgorithmException e) {
+            throw new DataStoreException(DIGEST + " not available", e);
+        } catch (IOException e) {
+            throw new DataStoreException("Could not add record", e);
+        } finally {
+            if (temporary != null) {
+                try {
+                    directory.delete(temporary, false);
+                } catch (IOException e) {
+                    // TODO: log
+                }
+            }
+        }
+    }
+
+    /**
+     * Returns the identified path. This method implements the pattern
+     * used to avoid problems with too many files in a single directory.
+     * <p>
+     * No sanity checks are performed on the given identifier.
+     *
+     * @param identifier data identifier
+     * @return identified file
+     */
+    private Path getPath(DataIdentifier identifier) {
+        usesIdentifier(identifier);
+        String string = identifier.toString();
+        Path path = this.path;
+        path = new Path(path, string.substring(0, 2));
+        path = new Path(path, string.substring(2, 4));
+        path = new Path(path, string.substring(4, 6));
+        return new Path(path, string);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void updateModifiedDateOnAccess(long before) {
+        minModifiedDate = before;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public int deleteAllOlderThan(long min) {
+        try {
+            return deleteOlderRecursive(path, min);
+        } catch (IOException e) {
+            // TODO: log
+        }
+        return 0;
+    }
+
+    private int deleteOlderRecursive(Path path, long min) throws IOException {
+        if (!directory.exists(path)) {
+            return 0;
+        }
+
+        int count = 0;
+        FileStatus status = directory.getFileStatus(path);
+        if (!status.isDir() && status.getPermission().getUserAction().implies(FsAction.WRITE))
{
+            synchronized (this) {
+                String fileName = path.getName();
+                if (status.getModificationTime() < min) {
+                    DataIdentifier id = new DataIdentifier(fileName);
+                    if (!inUse.containsKey(id)) {
+                        directory.delete(path, false);
+                        count++;
+                    }
+                }
+            }
+        } else if (status.isDir()) {
+            FileStatus[] list = directory.listStatus(path);
+            for (FileStatus fs : list) {
+                if (fs.isDir()) {
+                    count += deleteOlderRecursive(fs.getPath(), min);
+                }
+            }
+            // JCR-1396: FileDataStore Garbage Collector and empty directories
+            // Automatic removal of empty directories (but not the root!)
+            synchronized (this) {
+                if (path != this.path && directory.listStatus(path).length == 0)
{
+                    directory.delete(path, true);
+                }
+            }
+        }
+        return count;
+    }
+
+    private void listRecursive(List<Path> list, Path path) throws IOException {
+        for (FileStatus status : directory.listStatus(path)) {
+            if (status.isDir()) {
+                listRecursive(list, status.getPath());
+            } else {
+                list.add(path);
+            }
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public Iterator getAllIdentifiers() throws DataStoreException {
+        try {
+            List<Path> files = new ArrayList<Path>();
+            listRecursive(files, path);
+            List<DataIdentifier> identifiers = new ArrayList<DataIdentifier>();
+            for (Path p : files) {
+                String name = p.getName();
+                if (!name.startsWith(TMP)) {
+                    DataIdentifier id = new DataIdentifier(name);
+                    identifiers.add(id);
+                }
+            }
+            return identifiers.iterator();
+        } catch (IOException e) {
+            throw new DataStoreException(e);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void clearInUse() {
+        inUse.clear();
+    }
+
+    /**
+     * Get the name of the directory where this data store keeps the files.
+     *
+     * @return the full path name
+     */
+    public String getPath() {
+        return path.toString();
+    }
+
+    /**
+     * Set the name of the directory where this data store keeps the files.
+     *
+     * @param directoryName the directory.
+     */
+    public void setPath(String directoryName) {
+        this.path = new Path(directoryName);
+    }
+
+    /**
+     * Get the location where this data store keeps the files.
+     *
+     * @return the hadoop configuration property: fs.default.name
+     */
+    public String getFs_default_name() {
+        return config.get("fs.default.name");
+    }
+
+    /**
+     * Set the location where this data store keeps the files.
+     *
+     * @param name the hadoop configuration property: fs.default.name
+     */
+    public void setFs_default_name(String name) {
+        config.set("fs.default.name", name);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public int getMinRecordLength() {
+        return minRecordLength;
+    }
+
+    /**
+     * Set the minimum object length.
+     *
+     * @param minRecordLength the length
+     */
+    public void setMinRecordLength(int minRecordLength) {
+        this.minRecordLength = minRecordLength;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void close() {
+    }
+
+    //-----------------------------< internal >---------------------------------
+
+    private Path generateTempPath() throws IOException {
+        Path p;
+        do {
+            p = new Path(path, TMP + Integer.toString(random.nextInt() & 0xffff) + ".tmp");
+        } while (!directory.createNewFile(p));
+        return p;
+    }
+}

Propchange: jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/HadoopDataStore.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message