hey folks, i am working on mongodb persistence manager in CQ. I implemented all the methods but the repository shutdown due to unknown error during the startup. Can anybody help me look at the code? 


import com.mongodb.*;
import com.mongodb.gridfs.GridFS;
import com.mongodb.gridfs.GridFSDBFile;
import com.mongodb.gridfs.GridFSInputFile;
import org.apache.jackrabbit.core.id.NodeId;
import org.apache.jackrabbit.core.id.PropertyId;
import org.apache.jackrabbit.core.persistence.PMContext;
import org.apache.jackrabbit.core.persistence.bundle.AbstractBundlePersistenceManager;
import org.apache.jackrabbit.core.persistence.util.*;
import org.apache.jackrabbit.core.state.ItemStateException;
import org.apache.jackrabbit.core.state.NoSuchItemStateException;
import org.apache.jackrabbit.core.state.NodeReferences;
import org.bson.types.Binary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jcr.RepositoryException;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;

/**
 * Created with IntelliJ IDEA.
 * Date: 6/4/12
 * Time: 1:55 PM
 * To change this template use File | Settings | File Templates.
 */


public class MongoBundlePersistenceManager extends AbstractBundlePersistenceManager {

    protected DB itsDb;
    protected DBCollection itsNodeCollection;


    /**
     * flag for error handling
     */
    protected ErrorHandling errorHandling = new ErrorHandling();
    protected BundleBinding binding;
    /**
     * the minimum size of a property until it gets written to the blob store
     */
    private int minBlobSize = 0x1000;

    private  String itsSchema;

    /** the default logger */
    private static Logger log = LoggerFactory.getLogger(org.apache.jackrabbit.core.persistence.mongo.MongoBundlePersistenceManager.class);
    protected DBCollection itsRefCollection;
    protected BLOBStore itsBlobStore;

    @Override
    protected NodePropBundle loadBundle(NodeId id) throws ItemStateException {

        BasicDBObject query = new BasicDBObject();
        query.put("_id", id.toString());
        BasicDBObject theDBObject = (BasicDBObject) itsNodeCollection.findOne(query);
        if (theDBObject == null) {
            return null;
        }

        Binary theD = (Binary) theDBObject.get("d");
        InputStream in = new ByteArrayInputStream(theD.getData());
        try {
            return binding.readBundle(in, id);
        } catch (IOException e) {
            String msg = "failed to read bundle: " + id + ": " + e;
            log.error(msg,e);
            throw new ItemStateException(msg, e);
        }
    }

    /** initial size of buffer used to serialize objects */
    protected static final int INITIAL_BUFFER_SIZE = 1024;
    @Override
    protected synchronized void storeBundle(NodePropBundle bundle) throws ItemStateException {
        try {
            DBObject theQuery = new BasicDBObject();
            theQuery.put("_id", bundle.getId().toString());

            DBObject theDbObject = new BasicDBObject();
            theDbObject.put("_id", bundle.getId().toString());
            ByteArrayOutputStream out = new ByteArrayOutputStream(INITIAL_BUFFER_SIZE);
            binding.writeBundle(out, bundle);
            theDbObject.put("d", out.toByteArray());

            if( bundle.getParentId() != null) {
                theDbObject.put("p", bundle.getParentId().toString());
            }

            itsNodeCollection.update(theQuery, theDbObject, true, false);

        } catch (IOException e) {
            String msg = "failed to write bundle: " + bundle.getId() + ": " + e;
            log.error(msg, e);
            throw new ItemStateException(msg, e);
        }
    }

    @Override
    protected synchronized void destroyBundle(NodePropBundle bundle) throws ItemStateException {
        //todo: delete entire subtree in single query
        DBObject theDBObject = new BasicDBObject();
        theDBObject.put("_id", bundle.getId().toString());
        itsNodeCollection.remove(theDBObject);
    }

    @Override
    protected synchronized void destroy(NodeReferences refs) throws ItemStateException {
        DBObject theDBObject = new BasicDBObject();
        theDBObject.put("_id", refs.getTargetId().toString());
        itsRefCollection.remove(theDBObject);
    }

    @Override
    protected synchronized void store(NodeReferences refs) throws ItemStateException {

        DBObject theQuery = new BasicDBObject();
        theQuery.put("_id", refs.getTargetId().toString());

        DBObject theRef = new BasicDBObject();
        theRef.put("_id", refs.getTargetId().toString());

        ByteArrayOutputStream out = new ByteArrayOutputStream(INITIAL_BUFFER_SIZE);
        // serialize references
        try {
            org.apache.jackrabbit.core.persistence.util.Serializer.serialize(refs, out);
            theRef.put("d", out.toByteArray());
            itsRefCollection.update(theQuery, theRef, true, false);
        } catch (Exception e) {
            String msg = "failed to write " + refs;
            log.error(msg, e);
            throw new ItemStateException(msg, e);
        }


    }


    public void init(PMContext inContext) throws Exception {
        super.init(inContext);
        log.info("init mongo db persistance manager: " + getClass().getName());
        Mongo m = new Mongo("localhost");
        itsDb = m.getDB(getSchema());
        itsNodeCollection = itsDb.getCollection("node");
        itsRefCollection = itsDb.getCollection("ref");
        itsBlobStore = new MongoGridFSBLOBStore();
        // load namespaces
        binding = new BundleBinding(errorHandling, itsBlobStore, getNsIndex(), getNameIndex(), context.getDataStore());
        binding.setMinBlobSize(minBlobSize);
        log.info("complete init mongo db persistance manager: " + getClass().getName());
    }

    public NodeReferences loadReferencesTo(NodeId id) throws NoSuchItemStateException, ItemStateException {

        BasicDBObject theQuery = new BasicDBObject();
        theQuery.put("_id", id.toString());

        BasicDBObject theRefDb = (BasicDBObject) itsRefCollection.findOne(theQuery);
        if(theRefDb == null) {
            throw new NoSuchItemStateException(id.toString());
        }

        Binary theD = (Binary) theRefDb.get("d");
        InputStream in = new ByteArrayInputStream(theD.getData());

        NodeReferences theRef = new NodeReferences(id);
        try {
            org.apache.jackrabbit.core.persistence.util.Serializer.deserialize(theRef, in);
            return theRef;
        } catch (Exception e) {
            if (e instanceof NoSuchItemStateException) {
                throw (NoSuchItemStateException) e;
            }
            String msg = "failed to read references: " + id;
            log.error(msg, e);
            throw new ItemStateException(msg, e);
        }
    }

    public synchronized boolean existsReferencesTo(NodeId targetId) throws ItemStateException {

        BasicDBObject theDBObject = new BasicDBObject();
        theDBObject.put("_id", targetId.toString());
        BasicDBObject theRefDb = (BasicDBObject) itsRefCollection.findOne(theDBObject);
        return theRefDb != null;
    }

    public Iterable<NodeId> getAllNodeIds(NodeId after, int maxCount) throws ItemStateException, RepositoryException {
        DBObject theSort = new BasicDBObject();
        theSort.put("_id", "1");

        DBObject theQuery = new BasicDBObject();
        theQuery.put("_id", after.toString());

        theQuery =        QueryBuilder.start().greaterThan(theQuery).get();
        DBCursor theCursor = itsRefCollection.find(theQuery).sort(theSort).limit(maxCount);

        ArrayList<NodeId> result = new ArrayList<NodeId>();

        for (DBObject theDBObject : theCursor) {
            result.add(NodeId.valueOf(theDBObject.get("_id").toString()));
        }

        return result;
    }


    @Override
    protected BLOBStore getBlobStore() {
        return itsBlobStore;  //To change body of implemented methods use File | Settings | File Templates.
    }

    public String getSchema() {
        return itsSchema;
    }

    public void setSchema(String inSchema) {
        itsSchema = inSchema;
    }


    private  class MongoGridFSBLOBStore implements BLOBStore {

        private GridFS itsGripFS;

        public MongoGridFSBLOBStore() {

            itsGripFS = new GridFS( itsDb );

        }

        public String createId(PropertyId id, int index) {

            StringBuffer buf = new StringBuffer();
                      buf.append(id.getParentId().toString());
                      buf.append('.');
                      buf.append(getNsIndex().stringToIndex(id.getName().getNamespaceURI()));
                      buf.append('.');
                      buf.append(getNameIndex().stringToIndex(id.getName().getLocalName()));
                      buf.append('.');
                      buf.append(index);
                      return buf.toString();
        }

        public void put(String blobId, InputStream in, long size) throws Exception {
            GridFSInputFile theGridFSInputFile = itsGripFS.createFile( in, true );
            theGridFSInputFile.put("_id", blobId);
            theGridFSInputFile.save();


        }

        public InputStream get(String blobId) throws Exception {
            GridFSDBFile out = itsGripFS.findOne( new BasicDBObject( "_id" , blobId ) );
            return out.getInputStream();
        }

        public boolean remove(String blobId) throws Exception {
           itsGripFS.remove( new BasicDBObject( "_id" , blobId ) );
            return true;
        }
    }
}


- Steven


*****
This e-mail message is intended only for the designated recipient(s) named above. The information contained in this e-mail and any attachments may be confidential or legally privileged. If you are not the intended recipient, you may not review, retain, copy, redistribute or use this e-mail or any attachment for any purpose, or disclose all or any part of its contents. If you have received this e-mail in error, please immediately notify the sender by reply e-mail and permanently delete this e-mail and any attachments from your computer system.
*****