river-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nic...@apache.org
Subject svn commit: r724979 [13/14] - in /incubator/river/jtsk/skunk/niclas1/services/outrigger: ./ src/ src/main/ src/main/java/ src/main/java/com/ src/main/java/com/sun/ src/main/java/com/sun/jini/ src/main/java/com/sun/jini/outrigger/ src/main/java/com/sun/...
Date Wed, 10 Dec 2008 05:13:55 GMT
Added: incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/snaplogstore/BackEnd.java
URL: http://svn.apache.org/viewvc/incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/snaplogstore/BackEnd.java?rev=724979&view=auto
==============================================================================
--- incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/snaplogstore/BackEnd.java (added)
+++ incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/snaplogstore/BackEnd.java Tue Dec  9 21:13:53 2008
@@ -0,0 +1,797 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.sun.jini.outrigger.snaplogstore;
+
+import com.sun.jini.constants.TimeConstants;
+import com.sun.jini.outrigger.OutriggerServerImpl;
+import com.sun.jini.outrigger.Recover;
+import com.sun.jini.outrigger.StoredObject;
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Observable;
+import java.util.Observer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import net.jini.space.InternalSpaceException;
+
+/**
+ * Back end of snapshot log store. This class consumes logs written by
+ * LogOutputFile and stores the state as serilalzied objects. The class
+ * processes the logs to optimize what is stored in the snapshot. For
+ * example, a take log record will cause the removal of a write log
+ * record with the same id (if the transaction is null).  <p>
+ *
+ * Likewise, cancels will cause the removal of write and register
+ * records.  Also renew records update the expiration of the entry or
+ * registration and are not stored directly in the database.
+ */
+class BackEnd implements Observer
+{
+
+    // The following data represent the persistent
+    // state.
+    private Long sessionId;
+    private StoredObject joinState;
+    private Map entries;
+    private Map registrations;
+    private Map pendingTxns;
+    private byte topUuid[];
+    private LastLog lastLog;
+
+    /**
+     * Number of times to attempt to restart the consumer thread.
+     */
+    private int retry = 3;
+
+    /**
+     * Snapshot object
+     */
+    private SnapshotFile snapshotFile;
+
+    /**
+     * Keep logs and snapshot tied, though not necessary
+     */
+    private final int SNAPSHOT_VERSION = LogFile.LOG_VERSION;
+
+    /**
+     * The base name for the log files.
+     */
+    private String logFileBase;
+
+    /**
+     * The base name for the snapshot files
+     */
+    private String snapshotFileBase;
+
+    /**
+     * Log file consumer thread.
+     */
+    private ConsumerThread consumer;
+
+    /**
+     * Max time to wait for the consumer thread to die on destroy
+     */
+    private final static long WAIT_FOR_THREAD = 1 * TimeConstants.MINUTES;
+
+    /**
+     * Logger for logging persistent store related information
+     */
+    private static final Logger logger =
+        Logger.getLogger( OutriggerServerImpl.storeLoggerName );
+
+    /**
+     * Create a new <code>BackEnd</code> with the given <code>path</code>.
+     */
+    BackEnd( String path )
+    {
+        logFileBase = new File( path, LogFile.LOG_TYPE ).getAbsolutePath();
+        snapshotFileBase = new File( path, "Snapshot." ).getAbsolutePath();
+    }
+
+    /**
+     * Setup the database store and recover any existing state.
+     */
+    void setupStore( Recover space )
+    {
+
+        // Recover the snapshot (if any)
+        //
+        recoverSnapshot();
+
+        // Consume any remaining log files.
+        //
+        consumeLogs( true );
+
+        if( logger.isLoggable( Level.FINE ) )
+        {
+            logger.log( Level.FINE, "recoverSnapshot: number of entries:{0}, " +
+                                    "number of pendingTxns:{1}, number of registrations:{2}",
+                        new Object[]{ new Integer( entries.size() ),
+                                      new Integer( pendingTxns.size() ),
+                                      new Integer( registrations.size() ) } );
+        }
+
+        // Recover the session id
+        //
+        if( sessionId != null )
+        {
+            space.recoverSessionId( sessionId.longValue() );
+        }
+
+        // Recover the top level Uuid
+        //
+        if( topUuid != null )
+        {
+            space.recoverUuid( ByteArrayWrapper.toUuid( topUuid ) );
+        }
+
+        // Recover the join state
+        //
+        if( joinState != null )
+        {
+            try
+            {
+                space.recoverJoinState( joinState );
+            }
+            catch( Exception e )
+            {
+                throw logAndThrowRecoveryException(
+                    "Error recovering join state", e );
+            }
+        }
+
+        // Recover the entries
+        //
+        try
+        {
+            Iterator i = entries.values().iterator();
+
+            while( i.hasNext() )
+            {
+                space.recoverWrite( (Resource) i.next(), null );
+            }
+        }
+        catch( Exception e )
+        {
+            throw logAndThrowRecoveryException( "Error recovering entries", e );
+        }
+
+        // Recover the prepared transactions and remove any
+        // non-prepared ones.
+        try
+        {
+            Iterator i = pendingTxns.values().iterator();
+
+            while( i.hasNext() )
+            {
+                PendingTxn pt = (PendingTxn) i.next();
+
+                // If the pending transaction was not recovered
+                // (i.e. it was not prepared) then we can remove it.
+                //
+                if( !pt.recover( space ) )
+                {
+                    i.remove();
+                }
+            }
+        }
+        catch( Exception e )
+        {
+            throw logAndThrowRecoveryException( "Error recovering transactions",
+                                                e );
+        }
+
+        // Recover the registrations
+        //
+        try
+        {
+            Iterator i = registrations.values().iterator();
+
+            while( i.hasNext() )
+            {
+                Registration reg = (Registration) i.next();
+
+                final BaseObject[] templates = reg.getTemplates();
+
+                space.recoverRegister( reg, reg.getType(), templates );
+            }
+        }
+        catch( Exception e )
+        {
+            throw logAndThrowRecoveryException(
+                "Error recovering registrations", e );
+        }
+        startConsumer();
+    }
+
+    private void recoverSnapshot()
+    {
+        try
+        {
+            File[] snapshot = new File[1];
+            snapshotFile = new SnapshotFile( snapshotFileBase, snapshot );
+
+            if( snapshot[ 0 ] == null )
+            {
+
+                // no snapshot, initialize fields and return
+                sessionId = null;
+                entries = new HashMap();
+                registrations = new HashMap();
+                pendingTxns = new HashMap();
+                topUuid = null;
+                lastLog = null;
+                return;
+            }
+
+            final ObjectInputStream in =
+                new ObjectInputStream( new BufferedInputStream(
+                    new FileInputStream( snapshot[ 0 ] ) ) );
+
+            final int version = in.readInt();
+            if( version != SNAPSHOT_VERSION )
+            {
+                logAndThrowRecoveryException(
+                    "Wrong file version:" + version, null );
+            }
+
+            sessionId = (Long) in.readObject();
+            joinState = (StoredObject) in.readObject();
+            entries = (Map) in.readObject();
+            registrations = (Map) in.readObject();
+            pendingTxns = (Map) in.readObject();
+            topUuid = (byte[]) in.readObject();
+            lastLog = (LastLog) in.readObject();
+            in.close();
+        }
+        catch( RuntimeException t )
+        {
+            throw t;
+        }
+        catch( Throwable t )
+        {
+            throw logAndThrowRecoveryException( "Problem recovering snapshot", t );
+        }
+    }
+
+    private void startConsumer()
+    {
+
+        // Create and start the log consumer thread
+        //
+        consumer = new ConsumerThread();
+        consumer.start();
+    }
+
+    /**
+     * Thread to consume log files. <code>LogOutputFile</code> calls
+     * <code>update</code> (through the <code>Observer</code> interface
+     * each time a log file is written.
+     */
+    private class ConsumerThread extends Thread
+    {
+
+        private boolean more = false;
+        volatile private boolean interrupted = false;
+
+        ConsumerThread()
+        {
+        }
+
+        public void run()
+        {
+            try
+            {
+                while( !interrupted )
+                {
+
+                    // This block is first because when start is
+                    // called in setup there will not be any log files
+                    // to process. LogOutputFile is created after
+                    // setup returns.
+                    //
+                    synchronized( this )
+                    {
+                        while( !more )
+                        {
+                            wait();
+                        }
+                        more = false;
+                    }
+
+                    // There is a small window between the wait and
+                    // the consumeLogs where update can be called,
+                    // setting more to true and yet consumeLogs
+                    // actually consumes the log file that caused the
+                    // update. This unlikely situation is ok since
+                    // consumeLogs does the right thing if there are
+                    // no logs to process We could sync around
+                    // consumeLogs but we don't want LogOutputFile to
+                    // wait.
+                    //
+                    consumeLogs( false );
+                }
+            }
+            catch( InterruptedException exit )
+            {
+            }
+        }
+
+        // Cause the thread to consume a log file.
+        //
+        synchronized private void update()
+        {
+            more = true;    // For the case it is processing log files
+            notify();        // For the case is it waiting
+        }
+
+        // Set a local flag just in case someone clears the thread's own
+        // interrupted status.
+        //
+        public void interrupt()
+        {
+            interrupted = true;
+            super.interrupt();
+        }
+    }
+
+    //---------------------
+    // Required by Observer
+    //---------------------
+
+    public void update( Observable source, Object arg )
+    {
+
+        if( !consumer.isAlive() )
+        {
+            if( retry > 0 )
+            {
+                logger.log( Level.INFO,
+                            "Consumer thread died, attempting restart" );
+                retry--;
+                startConsumer();
+            }
+            else
+            {
+                logger.log( Level.SEVERE,
+                            "Consumer thread no longer running" );
+                return;
+            }
+        }
+        consumer.update();
+    }
+
+    /**
+     * Destroy the consumer thread and database
+     */
+    void destroy()
+    {
+        try
+        {
+            consumer.interrupt();
+
+            // wait for consumeLogs to finish in order to avoid errors
+            // once the database and log files are destroyed.
+            //
+            consumer.join( WAIT_FOR_THREAD );
+
+        }
+        catch( InterruptedException ignore )
+        {
+        }
+        finally
+        {
+            try
+            {
+                if( snapshotFile != null )
+                {
+                    snapshotFile.destroy();
+                }
+            }
+            catch( Throwable t )
+            {
+                logger.log( Level.INFO,
+                            "Exception encounter while destroying store", t );
+            }
+        }
+    }
+
+    /**
+     * Stop the consumer and close the database.
+     */
+    void close()
+    {
+        consumer.interrupt();
+        // Wait forever, can't close database until
+        // consumer stops (which during startup should
+        // not be long.
+        try
+        {
+            consumer.join();
+        }
+        catch( InterruptedException e )
+        {
+            // never happens
+        }
+        if( snapshotFile != null )
+        {
+            try
+            {
+                snapshotFile.close();
+            }
+            catch( Throwable t )
+            {
+                logger.log( Level.INFO,
+                            "Exception encounter while closing store", t );
+            }
+        }
+    }
+
+    /**
+     * Return the pending transaction description for the given
+     * transaction, creating the object and adding it to the table if
+     * necessary.
+     */
+    private PendingTxn pendingTxn( Long txnId )
+    {
+        PendingTxn pt = (PendingTxn) pendingTxns.get( txnId );
+        if( pt == null )
+        {
+            pt = new PendingTxn( txnId );
+            pendingTxns.put( txnId, pt );
+        }
+        return pt;
+    }
+
+    /**
+     * Remove a pending transaction from the table.  If it isn't there,
+     * this call is harmless.
+     */
+    private void removePendingTxn( Long txnId )
+    {
+        pendingTxns.remove( txnId ); // if it fails, it wasn't there to remove
+    }
+
+    // ------------------------------------------------------------
+    //                  Log stuff
+    // ------------------------------------------------------------
+
+    // The following methods are called when a recovered log element
+    // is read from the log file. Some methods, writeOp and takeOp
+    // can also be called when a pending transaction is committed.
+    //
+
+    /**
+     * This method sets the session id in the database. It's value is
+     * only used during recovery after a restart.
+     */
+    void bootOp( long time, long session )
+    {
+        sessionId = new Long( session );
+        if( logger.isLoggable( Level.FINE ) )
+        {
+            logger.log( Level.FINE, "bootOp({0})", new Date( time ) );
+        }
+    }
+
+    /**
+     * Record the join state.
+     */
+    void joinStateOp( StoredObject state )
+    {
+        joinState = state;
+        if( logger.isLoggable( Level.FINE ) )
+        {
+            logger.log( Level.FINE, "joinStateOp()" );
+        }
+    }
+
+    /**
+     * This method records a logged write operation. If under a
+     * transaction the resource is held in a list for the pending
+     * transaction. When committed this method will be called again
+     * with the resource and a null transaction id.
+     */
+    void writeOp( Resource entry, Long txnId )
+    {
+        if( logger.isLoggable( Level.FINE ) )
+        {
+            logger.log( Level.FINE, "writeOp({0},{1})",
+                        new Object[]{ entry, txnId } );
+        }
+
+        if( txnId != null )
+        {
+            pendingTxn( txnId ).addWrite( entry );
+        }
+        else
+        {
+            entries.put( entry.getCookieAsWrapper(), entry );
+        }
+    }
+
+    /**
+     * This method records a logged take operation. If under a
+     * transaction the resource is held in a list for the pending
+     * transaction. When committed this method will be called again
+     * with the resource and a null transaction id.
+     */
+    void takeOp( byte cookie[], Long txnId )
+    {
+        if( logger.isLoggable( Level.FINE ) )
+        {
+            logger.log( Level.FINE, "takeOp({0},{1})",
+                        new Object[]{ ByteArrayWrapper.toUuid( cookie ), txnId } );
+        }
+
+        if( txnId != null )
+        {
+            pendingTxn( txnId ).addTake( cookie );
+        }
+        else
+        {
+            entries.remove( new ByteArrayWrapper( cookie ) );
+        }
+    }
+
+    /*
+     * This method records a logged event registration.
+     */
+    void registerOp( Registration registration )
+    {
+        logger.log( Level.FINE, "registerOp({0})", registration );
+
+        registrations.put( registration.getCookieAsWrapper(), registration );
+    }
+
+    /**
+     * This method processes a logged renew operation. Renew operations
+     * apply to resources passed into writeOp and registerOp.
+     */
+    void renewOp( byte cookie[], long expiration )
+    {
+        if( logger.isLoggable( Level.FINE ) )
+        {
+            logger.log( Level.FINE, "renewOp({0},{1})",
+                        new Object[]{ ByteArrayWrapper.toUuid( cookie ),
+                                      new Long( expiration ) } );
+        }
+        final ByteArrayWrapper baw = new ByteArrayWrapper( cookie );
+
+        Resource resource;
+
+        if( ( resource = (Resource) entries.get( baw ) ) == null )
+        {
+            // not an entry, try event registrations
+            if( ( resource = (Resource) registrations.get( baw ) ) == null )
+            {
+
+                // No registration either, try transactional writes
+                Iterator i = pendingTxns.values().iterator();
+                while( i.hasNext() )
+                {
+                    if( ( resource = ( (PendingTxn) i.next() ).get( baw ) ) != null )
+                    {
+                        break;
+                    }
+                }
+            }
+        }
+        if( resource != null )
+        {
+            resource.setExpiration( expiration );
+        }
+    }
+
+    /**
+     * This method processes a logged cancel operation. Cancel operations
+     * apply to resources passed into writeOp and registerOp.
+     */
+    void cancelOp( byte cookie[] )
+    {
+        if( logger.isLoggable( Level.FINE ) )
+        {
+            logger.log( Level.FINE, "cancelOp({0})",
+                        ByteArrayWrapper.toUuid( cookie ) );
+        }
+        final ByteArrayWrapper baw = new ByteArrayWrapper( cookie );
+
+        if( entries.remove( baw ) == null )
+        {
+            if( registrations.remove( baw ) == null )
+            {
+
+                Iterator i = pendingTxns.values().iterator();
+                while( i.hasNext() )
+                {
+                    if( ( (PendingTxn) i.next() ).remove( baw ) != null )
+                    {
+                        break;
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * This method prepares a pending transaction.
+     */
+    void prepareOp( Long txnId, StoredObject transaction )
+    {
+        logger.log( Level.FINE, "prepareOp({0})", txnId );
+
+        PendingTxn pt = pendingTxn( txnId );
+        pt.prepare( transaction );
+    }
+
+    /**
+     * This method commits a pending transaction.
+     */
+    void commitOp( Long txnId )
+    {
+        logger.log( Level.FINE, "commitOp({0})", txnId );
+
+        PendingTxn pt = pendingTxn( txnId );
+        pt.commit( this );
+        removePendingTxn( txnId );
+    }
+
+    /**
+     * This method aborts a pending transaction.
+     */
+    void abortOp( Long txnId )
+    {
+        logger.log( Level.FINE, "abortOp({0})", txnId );
+
+        removePendingTxn( txnId );
+    }
+
+    /**
+     * This method records the service's top level <code>Uuid</code>
+     *
+     * @param uuid The service's <code>Uuid</code> represented as a
+     *             <code>byte[16]</code>.
+     */
+    void uuidOp( byte[] uuid )
+    {
+        if( logger.isLoggable( Level.FINE ) )
+        {
+            logger.log( Level.FINE, "uuidOp({0})",
+                        ByteArrayWrapper.toUuid( uuid ) );
+        }
+
+        topUuid = uuid;
+    }
+
+    /**
+     * Consume the log files that exist.  If <code>all</code> is
+     * <code>true</code>, all found log files will be processed.
+     * If <code>log</code> is <code>false</code>, then all but the
+     * most recent will be processed; this will prevent the back
+     * end from reading the log file that is currently being
+     * produced by the front end.
+     */
+    private void consumeLogs( boolean all )
+    {
+        Iterator it;
+        try
+        {
+            it = LogInputFile.logs( logFileBase, all );
+        }
+        catch( IOException e )
+        {
+            final String msg = "couldn't open logs";
+            final InternalSpaceException ise =
+                new InternalSpaceException( msg, e );
+            logger.log( Level.SEVERE, msg, ise );
+            throw ise;
+        }
+
+        while( it.hasNext() )
+        {
+            LogInputFile log = (LogInputFile) it.next();
+            logger.log( Level.FINE, "processing {0})", log );
+
+            if( log == null )        // file already consumed
+            {
+                continue;
+            }
+
+            try
+            {
+                String logFile = log.toString();
+                if( lastLog == null || !lastLog.sameAs( logFile ) )
+                {
+                    log.consume( this );
+                }
+                lastLog = new LastLog( logFile );
+
+                ObjectOutputStream out = snapshotFile.next();
+
+                out.writeInt( SNAPSHOT_VERSION );
+                out.writeObject( sessionId );
+                out.writeObject( joinState );
+                out.writeObject( entries );
+                out.writeObject( registrations );
+                out.writeObject( pendingTxns );
+                out.writeObject( topUuid );
+                out.writeObject( lastLog );
+                snapshotFile.commit();
+            }
+            catch( IOException e )
+            {
+                final String msg = "error writing snapshot";
+                final InternalSpaceException ise =
+                    new InternalSpaceException( msg, e );
+                logger.log( Level.SEVERE, msg, ise );
+                throw ise;
+            }
+            log.finished();
+        }
+    }
+
+    /**
+     * This class remembers which log file was the last to be
+     * successfully consumed.  If the recovery mechanism reopens this
+     * file, then it will skip its contents -- this indicates a crash
+     * happened after the contents were committed to the snapshot but
+     * before the file was unlinked.
+     */
+    private static class LastLog implements Serializable
+    {
+        private String logFile;
+        private long timeStamp;
+
+        LastLog( String path )
+        {
+            logFile = path;
+            timeStamp = new File( logFile ).lastModified();
+        }
+
+        boolean sameAs( String otherPath )
+        {
+            if( !logFile.equals( otherPath ) )
+            {
+                return false;
+            }
+            return ( new File( otherPath ).lastModified() == timeStamp );
+        }
+    }
+
+    /**
+     * Log and throw an InternalSpaceException to flag a store
+     * recovery problem.
+     */
+    private InternalSpaceException logAndThrowRecoveryException(
+        String msg, Throwable nested )
+    {
+        final InternalSpaceException e =
+            new InternalSpaceException( msg, nested );
+        logger.log( Level.SEVERE, msg, e );
+        throw e;
+    }
+}

Added: incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/snaplogstore/BaseObject.java
URL: http://svn.apache.org/viewvc/incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/snaplogstore/BaseObject.java?rev=724979&view=auto
==============================================================================
--- incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/snaplogstore/BaseObject.java (added)
+++ incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/snaplogstore/BaseObject.java Tue Dec  9 21:13:53 2008
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.sun.jini.outrigger.snaplogstore;
+
+import com.sun.jini.outrigger.StorableObject;
+import com.sun.jini.outrigger.StoredObject;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import net.jini.space.InternalSpaceException;
+
+/**
+ * Top level wrapper class for persisting outrigger objects.
+ * The target object is serialized and stored here as a byte
+ * array.
+ */
+class BaseObject implements StoredObject, Serializable
+{
+    static final long serialVersionUID = -400804064969360164L;
+
+    private byte[] blob;
+
+    BaseObject( StorableObject object )
+    {
+        try
+        {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            ObjectOutputStream oos = new ObjectOutputStream( baos );
+            object.store( oos );
+            oos.flush();
+            blob = baos.toByteArray();
+            oos.close();
+        }
+        catch( IOException e )
+        {
+            throw new InternalSpaceException( "Exception serializing resource", e );
+        }
+    }
+
+    public void restore( StorableObject object )
+        throws IOException, ClassNotFoundException
+    {
+        ObjectInputStream ois = new ObjectInputStream( new ByteArrayInputStream( blob ) );
+        object.restore( ois );
+        ois.close();
+    }
+}

Added: incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/snaplogstore/ByteArrayWrapper.java
URL: http://svn.apache.org/viewvc/incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/snaplogstore/ByteArrayWrapper.java?rev=724979&view=auto
==============================================================================
--- incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/snaplogstore/ByteArrayWrapper.java (added)
+++ incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/snaplogstore/ByteArrayWrapper.java Tue Dec  9 21:13:53 2008
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.sun.jini.outrigger.snaplogstore;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import net.jini.id.Uuid;
+import net.jini.id.UuidFactory;
+
+/**
+ * In the backend <code>Uuid</code>s are represented using
+ * <code>byte[16]</code>s. This works fine is most places, but
+ * sometimes we need to use a <code>Uuid</code> as a key in a hash
+ * table. Arrays do not make good hash table keys so we wrap the array
+ * in one of these to provide suitable implementations of
+ * <code>hashCode</code> and <code>equals</code>.
+ * <p>
+ * This method also has utility methods for converting
+ * <code>Uuid</code>s to and from <code>byte[16]</code>s.
+ */
+class ByteArrayWrapper implements Serializable
+{
+    /**
+     * The 16 bytes being wrapped
+     */
+    private byte[] uuid;
+
+    /**
+     * A 32 bit hash of uuid
+     */
+    private int hash;
+
+    /**
+     * Create a new <code>ByteArrayWrapper</code> that
+     * wraps the provided array.
+     *
+     * @param v The array to wrap.
+     * @throws IllegalArgumentException if <code>v.length</code> is
+     *                                  not 16.
+     */
+    ByteArrayWrapper( byte v[] )
+    {
+        uuid = v;
+        // Same hash Uuid uses
+        hash = hashFor( v );
+    }
+
+    public boolean equals( Object o )
+    {
+        if( !( o instanceof ByteArrayWrapper ) )
+        {
+            return false;
+        }
+
+        if( o == null )
+        {
+            return false;
+        }
+
+        final byte[] ouuid = ( (ByteArrayWrapper) o ).uuid;
+        return Arrays.equals( uuid, ouuid );
+    }
+
+    public int hashCode()
+    {
+        return hash;
+    }
+
+    /**
+     * Encode the passed <code>Uuid</code> in to a newly allocated
+     * <code>byte[16]</code> in big-endian byte order.
+     *
+     * @param uuid the <code>Uuid</code> to encode.
+     * @return A new <code>byte[16]</code> initialized to the
+     *         same bit pattern as <code>uuid</code>.
+     * @throws NullPointerException if <code>uuid</code> is
+     *                              <code>null</code>.
+     */
+    static byte[] toByteArray( Uuid uuid )
+    {
+        final byte rslt[] = new byte[16];
+        long bits0 = uuid.getMostSignificantBits();
+        for( int i = 7; i >= 0; i-- )
+        {
+            rslt[ i ] = (byte) bits0;
+            bits0 = bits0 >>> 8;
+        }
+
+        long bits1 = uuid.getLeastSignificantBits();
+        for( int i = 15; i >= 8; i-- )
+        {
+            rslt[ i ] = (byte) bits1;
+            bits1 = bits1 >>> 8;
+        }
+
+        return rslt;
+    }
+
+    /**
+     * Create a new <code>Uuid</code> that matches the bit pattern
+     * in the passed <code>byte[]</code>. Assumes the bit pattern
+     * is in big-endian byte order.
+     *
+     * @param bits the <code>byte[]</code> with bit pattern
+     * @return A new <code>Uuid</code> that matches the
+     *         passed <code>byte[]</code>.
+     * @throws NullPointerException     if <code>uuid</code> is
+     *                                  <code>null</code>.
+     * @throws IllegalArgumentException if <code>bits.length</code>
+     *                                  is not 16.
+     */
+    static Uuid toUuid( byte bits[] )
+    {
+        if( bits.length != 16 )
+        {
+            throw new IllegalArgumentException( "uuid.length must be 16" );
+        }
+
+        long bits0 = 0;
+        for( int i = 0; i < 7; i++ )
+        {
+            bits0 = bits0 | ( bits[ i ] & 0xFF );
+            bits0 = bits0 << 8;
+        }
+        bits0 = bits0 | ( bits[ 7 ] & 0xFF );
+
+        long bits1 = 0;
+        for( int i = 8; i < 15; i++ )
+        {
+            bits1 = bits1 | ( bits[ i ] & 0xFF );
+            bits1 = bits1 << 8;
+        }
+        bits1 = bits1 | ( bits[ 15 ] & 0xFF );
+
+        return UuidFactory.create( bits0, bits1 );
+    }
+
+    /**
+     * Compute an equivalent hash to <code>Uuid</code>.
+     *
+     * @throws IllegalArgumentException if <code>uuid.length</code>
+     *                                  is not 16.
+     */
+    static int hashFor( byte uuid[] )
+    {
+        if( uuid.length != 16 )
+        {
+            throw new IllegalArgumentException( "uuid.length must be 16" );
+        }
+        return
+            ( ( uuid[ 15 ] ^ uuid[ 11 ] ^ uuid[ 7 ] ^ uuid[ 3 ] ) << 24 ) |
+            ( ( uuid[ 14 ] ^ uuid[ 10 ] ^ uuid[ 6 ] ^ uuid[ 2 ] ) << 16 ) |
+            ( ( uuid[ 13 ] ^ uuid[ 9 ] ^ uuid[ 5 ] ^ uuid[ 1 ] ) << 8 ) |
+            ( ( uuid[ 12 ] ^ uuid[ 8 ] ^ uuid[ 4 ] ^ uuid[ 0 ] ) << 0 );
+    }
+}

Added: incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/snaplogstore/LogFile.java
URL: http://svn.apache.org/viewvc/incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/snaplogstore/LogFile.java?rev=724979&view=auto
==============================================================================
--- incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/snaplogstore/LogFile.java (added)
+++ incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/snaplogstore/LogFile.java Tue Dec  9 21:13:53 2008
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.sun.jini.outrigger.snaplogstore;
+
+import com.sun.jini.outrigger.OutriggerServerImpl;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.TreeMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The base class for the logging file classes.  This class provides
+ * the common functionality, but you should not instantiate it.
+ * <p>
+ * (Note -- I use <code>protected</code> here as an advisory notice.
+ * Clearly, since this is package code, all classes in the package have
+ * access, but fields marked <code>protected</code> are expected to be
+ * used only by subclasses.  Use good taste.)
+ *
+ * @author Sun Microsystems, Inc.
+ * @see LogOutputFile
+ * @see LogInputFile
+ */
+class LogFile
+{
+    /**
+     * The directory in which the log files live.
+     */
+    protected File baseDir;
+
+    /**
+     * The base part of the file name (e.g., <code>"log."</code> for
+     * <code>"log.0"</code>, <code>"log.1"</code>, ...)
+     */
+    protected String baseFile;
+
+    /**
+     * The type of log stream
+     */
+    static final String LOG_TYPE = "LogStore";
+
+    /**
+     * The version of the log stream (the highest one known).
+     */
+    protected static final int LOG_VERSION = 3;
+
+    /**
+     * A log entry that records a boot.
+     */
+    protected static final byte BOOT_OP = 1;
+    /**
+     * A log entry that records the join state.
+     */
+    protected static final byte JOINSTATE_OP = 11;
+    /**
+     * A log entry that records a <code>write</code>.
+     */
+    protected static final byte WRITE_OP = 2;
+    /**
+     * A log entry that records a <code>take</code>.
+     */
+    protected static final byte TAKE_OP = 3;
+    /**
+     * A log entry that records a <code>notify</code>.
+     */
+    protected static final byte REGISTER_OP = 4;
+    /**
+     * A log entry that records a <code>notify</code>.
+     */
+    protected static final byte RENEW_OP = 5;
+    /**
+     * A log entry that records a notification and new sequence number.
+     */
+    protected static final byte NOTIFIED_OP = 6;
+    /**
+     * A log entry that records a <code>cancel</code>.
+     */
+    protected static final byte CANCEL_OP = 7;
+    /**
+     * A log entry that records a transaction <code>prepare</code>.
+     */
+    protected static final byte PREPARE_OP = 8;
+    /**
+     * A log entry that records a transaction <code>commit</code>.
+     */
+    protected static final byte COMMIT_OP = 9;
+    /**
+     * A log entry that records a transaction <code>abort</code>.
+     */
+    protected static final byte ABORT_OP = 10;
+    /**
+     * A log entry that records the service's <code>Uuid</code>.
+     */
+    protected static final byte UUID_OP = 12;
+    /**
+     * A log entry that records a batch <code>write</code>.
+     */
+    protected static final byte BATCH_WRITE_OP = 13;
+    /**
+     * A log entry that records a batch <code>take</code>.
+     */
+    protected static final byte BATCH_TAKE_OP = 14;
+
+    /**
+     * Logger for logging persistent store related information
+     */
+    private static final Logger logger =
+        Logger.getLogger( OutriggerServerImpl.storeLoggerName );
+
+    /**
+     * Create a log file with the given base directory, base file name
+     * within the directory.  This is intended only for
+     * when you are sure of the exact values -- no modifications are
+     * made to the values.
+     */
+    protected LogFile( File baseDir, String baseFile )
+    {
+        this.baseDir = baseDir;
+        this.baseFile = baseFile;
+    }
+
+    /**
+     * Create a log file from the given template.  If
+     * <code>basePath</code> has a directory component, it is used as
+     * the base directory.  Otherwise the base directory is
+     * <code>"."</code>.  If <code>basePath</code> names a directory,
+     * the base name will be <code>""</code>.  Otherwise the file
+     * component is used as the base, with a "." added at the end if it
+     * is not already present.
+     */
+    protected LogFile( String basePath ) throws IOException
+    {
+        baseDir = new File( basePath );
+        if( baseDir.isDirectory() )
+        {
+            baseFile = "";
+        }
+        else
+        {
+            baseFile = baseDir.getName();
+            String pname = baseDir.getParent();
+            if( pname == null )
+            {
+                pname = ".";
+            }
+            baseDir = new File( pname );
+            if( baseFile.charAt( baseFile.length() - 1 ) != '.' )
+            {
+                baseFile += ".";
+            }
+        }
+    }
+
+    /**
+     * Fill in a list of existing matching log files, oldest to newest,
+     * returning the highest number used as a suffix, or -1 if
+     * no files were found.  If two files have the same time, they are
+     * sorted by the numeric value of the suffix.
+     */
+    int existingLogs( Collection files )
+    {
+        if( logger.isLoggable( Level.FINE ) )
+        {
+            logger.log( Level.FINE, "scanning {0} for {1} baseFile",
+                        new Object[]{ baseDir, baseFile } );
+        }
+
+        String[] inDir = baseDir.list();    // directory contents
+        TreeMap found = new TreeMap();        // the files we've found
+        int highest = -1;            // largest # suffix seen
+
+        // no directory or files (can happen on destroy)
+        if( inDir == null )
+        {
+            return highest;
+        }
+
+        fileLoop:
+        for( int f = 0; f < inDir.length; f++ )
+        {
+
+            String name = inDir[ f ];
+
+            logger.log( Level.FINE, "checking {0}", name );
+
+            if( !name.startsWith( baseFile ) )        // is it one of ours?
+            {
+                continue;
+            }
+
+            // ensure that there is a numerical suffix
+            int num;
+            try
+            {
+                num = Integer.parseInt( name.substring( baseFile.length() ) );
+                if( num > highest )               // keep track of highest
+                {
+                    highest = num;
+                }
+            }
+            catch( NumberFormatException e )
+            {
+                continue fileLoop;               // can't be one of ours
+            }
+
+            found.put( new Integer( num ), new File( baseDir, name ) );
+        }
+
+        files.addAll( found.values() );
+        if( logger.isLoggable( Level.FINE ) )
+        {
+            logger.log( Level.FINE, "returning {0} files",
+                        new Integer( files.size() ) );
+            Iterator it = files.iterator();
+            while( it.hasNext() )
+            {
+                logger.log( Level.FINE, it.next().toString() );
+            }
+        }
+
+        return highest;
+    }
+
+    /**
+     * Destroy all log files associated with this stream.
+     */
+    void destroy()
+    {
+        if( logger.isLoggable( Level.FINE ) )
+        {
+            logger.log( Level.FINE, "destroy" );
+        }
+
+        ArrayList files = new ArrayList();
+        existingLogs( files );
+        for( int i = 0; i < files.size(); i++ )
+        {
+            File log = (File) files.get( i );
+            try
+            {
+                if( !log.delete() )
+                {
+                    logger.log( Level.INFO, "Could not delete {0}", log );
+                }
+            }
+            catch( SecurityException e )
+            {
+                if( !log.delete() )
+                {
+                    logger.log( Level.INFO,
+                                "SecurityException : Could not delete " + log,
+                                e );
+                }
+            }
+        }
+    }
+}

Added: incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/snaplogstore/LogInputFile.java
URL: http://svn.apache.org/viewvc/incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/snaplogstore/LogInputFile.java?rev=724979&view=auto
==============================================================================
--- incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/snaplogstore/LogInputFile.java (added)
+++ incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/snaplogstore/LogInputFile.java Tue Dec  9 21:13:53 2008
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.sun.jini.outrigger.snaplogstore;
+
+import com.sun.jini.outrigger.OutriggerServerImpl;
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import net.jini.space.InternalSpaceException;
+
+/**
+ * A class to help you read log files created by <code>LogOutputFile</code>.
+ *
+ * @author Sun Microsystems, Inc.
+ * @see LogOutputFile
+ */
+class LogInputFile extends LogFile
+{
+    private File file;    // the current log file
+
+    private static final long intBytes = 4;
+
+    /**
+     * Logger for logging persistent store related information
+     */
+    private static final Logger logger =
+        Logger.getLogger( OutriggerServerImpl.storeLoggerName );
+
+    /**
+     * Return an <code>Iterator</code> that will loop through all
+     * the logs that match the given <code>basePath</code> pattern,
+     * interpreted as described in the matching <code>LogStream</code>
+     * constructor.  If <code>returnAll</code> is <code>false</code>,
+     * the most recent file will be left off the list.  This would be
+     * the proper value for an ongoing poll looking for completed log
+     * files.  You would specify <code>true</code> during recovery,
+     * when all existing logs should be committed because no new ones
+     * are currently being created
+     *
+     * @see java.util.Iterator
+     * @see LogStream#LogStream(String)
+     */
+    static Iterator logs( String basePath, boolean returnAll )
+        throws IOException
+    {
+        LogFile lf = new LogFile( basePath );// an object to represent the path
+        ArrayList inDir = new ArrayList();
+        lf.existingLogs( inDir );
+
+        // strip off most recent if we're not trying to read them all
+        if( !returnAll && inDir.size() > 0 )
+        {
+            inDir.remove( inDir.size() - 1 );
+        }
+
+        return new LogInputFileIterator( inDir, lf );
+    }
+
+    /**
+     * The implementation of <code>Iterator</code> returned by
+     * <code>LogInputStream.logs</code>.  The <code>next</code> method
+     * occasionally returns <code>null</code>.
+     *
+     * @see LogInputFileIterator#next
+     */
+    private static class LogInputFileIterator implements Iterator
+    {
+        private LogFile baseLogFile;
+        private Iterator fileList;
+
+        /**
+         * Create a new <code>LogInputFileIterator</code> object
+         * for the given list.
+         */
+        LogInputFileIterator( Collection files, LogFile baseLogFile )
+        {
+            this.baseLogFile = baseLogFile;
+            fileList = files.iterator();
+        }
+
+        public boolean hasNext()
+        {
+            return fileList.hasNext();
+        }
+
+        /**
+         * Return the next <code>File</code> object, or
+         * <code>null</code>.  You will get <code>null</code> when the
+         * file existed at the time of listing, but no longer exists
+         * when the iterator gets to it.  For example, if a process is
+         * consuming all completed logs, the listing might find a log,
+         * but that process may have consumed and removed it by the
+         * time you invoke <code>next</code>, so you will get a
+         * <code>null</code>.
+         */
+        public Object next()
+        {
+            File file = (File) fileList.next();
+            try
+            {
+                return new LogInputFile( baseLogFile, file );
+            }
+            catch( IOException e )
+            {
+                file.delete();    // file is malformed -- remove it
+                return null;    // can't throw any reasonable exception,
+                // so signal the problem with a null
+            }
+        }
+
+        /**
+         * Remove the <code>File</code> object returned by the iterator
+         * from the list.  This does <em>not</em> remove the file
+         * itself.
+         */
+        public void remove()
+        {
+            fileList.remove();
+        }
+    }
+
+    /**
+     * Create a new <code>LogInputFile</code>.
+     * <p>
+     * <b>Note:</b> Don't invoke this.  This is needed by the
+     * enumeration returned by <code>logs</code>, which is how you
+     * should be getting <code>LogInputFile</code> objects.  When
+     * nested classes arrive, this constructor can be properly
+     * protected.
+     *
+     * @see logs
+     */
+    private LogInputFile( LogFile desc, File path ) throws IOException
+    {
+        super( desc.baseDir, desc.baseFile );
+        file = path;
+    }
+
+    /**
+     * Consume the input file, invoking the appropriate operations on
+     * the given object.
+     */
+    synchronized void consume( BackEnd opOn )
+    {
+        try
+        {
+            DataInputStream din =
+                new DataInputStream( new BufferedInputStream(
+                    new FileInputStream( file ) ) );
+            ObjectInputStream in = new ObjectInputStream( din );
+
+            long length = file.length();
+            int fileVer = din.readInt();
+
+            if( fileVer != LOG_VERSION )
+            {
+                failure( "unsupported log version: " + fileVer );
+            }
+
+            long logBytes = intBytes;
+            int updateLen = din.readInt();
+
+            Long txnId;
+            int count;
+            Resource rep;
+            byte[] cookie;
+
+            while( updateLen != 0 )
+            {    /* 0 is expected termination case */
+
+                if( updateLen < 0 )    /* serious corruption */
+                {
+                    failure( "file corrupted, negative record length at " +
+                             logBytes );
+                }
+
+                if( length - logBytes - intBytes < updateLen )
+
+                    /* partial record at end of log; this should not happen
+                    * if forceToDisk is always true, but might happen if
+                    * buffered updates are used.
+                    */
+                {
+                    failure( "file corrupted, partial record at " + logBytes );
+                }
+
+                int op = in.readByte();
+
+                switch( op )
+                {
+                case BOOT_OP:
+                    long time = in.readLong();
+                    long sessionId = in.readLong();
+                    opOn.bootOp( time, sessionId );
+                    break;
+
+                case JOINSTATE_OP:
+                    BaseObject state = (BaseObject) in.readObject();
+                    opOn.joinStateOp( state );
+                    break;
+
+                case WRITE_OP:
+                    rep = (Resource) in.readObject();
+                    txnId = (Long) in.readObject();
+                    opOn.writeOp( rep, txnId );
+                    break;
+
+                case BATCH_WRITE_OP:
+                    txnId = (Long) in.readObject();
+                    count = in.readInt();
+                    for( int i = 0; i < count; i++ )
+                    {
+                        rep = (Resource) in.readObject();
+                        opOn.writeOp( rep, txnId );
+                    }
+                    break;
+
+                case TAKE_OP:
+                    cookie = new byte[16];
+                    in.readFully( cookie );
+                    txnId = (Long) in.readObject();
+                    opOn.takeOp( cookie, txnId );
+                    break;
+
+                case BATCH_TAKE_OP:
+                    txnId = (Long) in.readObject();
+                    count = in.readInt();
+                    for( int i = 0; i < count; i++ )
+                    {
+                        cookie = new byte[16];
+                        in.readFully( cookie );
+                        opOn.takeOp( cookie, txnId );
+                    }
+                    break;
+
+                case REGISTER_OP:
+                    Registration registration =
+                        (Registration) in.readObject();
+                    opOn.registerOp( registration );
+                    break;
+
+                case RENEW_OP:
+                    cookie = new byte[16];
+                    in.readFully( cookie );
+                    long expires = in.readLong();
+                    opOn.renewOp( cookie, expires );
+                    break;
+
+                case CANCEL_OP:
+                    cookie = new byte[16];
+                    in.readFully( cookie );
+                    opOn.cancelOp( cookie );
+                    break;
+
+                case PREPARE_OP:
+                    txnId = (Long) in.readObject();
+                    BaseObject transaction = (BaseObject) in.readObject();
+                    opOn.prepareOp( txnId, transaction );
+                    break;
+
+                case COMMIT_OP:
+                    txnId = (Long) in.readObject();
+                    opOn.commitOp( txnId );
+                    break;
+
+                case ABORT_OP:
+                    txnId = (Long) in.readObject();
+                    opOn.abortOp( txnId );
+                    break;
+
+                case UUID_OP:
+                    final byte uuid[] = new byte[16];
+                    in.readFully( uuid );
+                    opOn.uuidOp( uuid );
+                    break;
+
+                default:
+                    failure( "log record corrupted, unknown opcode" );
+
+                }  // case
+
+                logBytes += ( intBytes + updateLen );
+
+                // deal with padding
+                int offset = (int) logBytes & 3;
+                if( offset > 0 )
+                {
+                    offset = 4 - offset;
+                    logBytes += offset;
+                    din.skipBytes( offset );
+                }
+                updateLen = din.readInt();
+
+            }  // while
+        }
+        catch( EOFException e )
+        {
+            failure( "unexpected end-of-file", e );
+
+        }
+        catch( IOException e )
+        {
+            failure( "I/O error while consuming logs", e );
+
+        }
+        catch( ClassNotFoundException e )
+        {
+            failure( "unexpected class?", e );
+        }
+    }
+
+    /**
+     * Report a failure consuming the log file and throw an
+     * <code>InternalSpaceException</code> containing <code>message</code>.
+     */
+    private void failure( String message )
+    {
+        failure( message, null );
+    }
+
+    /**
+     * Report a exception while consuming the log file and throw an
+     * <code>InternalSpaceException</code> containing <code>message</code>.
+     */
+    private void failure( String message, Exception e )
+    {
+        String errorMsg = "Error consuming log file: " + file + ", " +
+                          message + "Log file consumption stopped";
+
+        final InternalSpaceException ise =
+            new InternalSpaceException( errorMsg, e );
+        logger.log( Level.SEVERE, errorMsg, ise );
+        throw ise;
+    }
+
+    /**
+     * This log has been successfully drained, and committed -- it can be
+     * removed.
+     */
+    void finished()
+    {
+        file.delete();
+    }
+
+    public String toString()
+    {
+        return file.toString();
+    }
+}

Added: incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/snaplogstore/LogOutputFile.java
URL: http://svn.apache.org/viewvc/incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/snaplogstore/LogOutputFile.java?rev=724979&view=auto
==============================================================================
--- incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/snaplogstore/LogOutputFile.java (added)
+++ incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/snaplogstore/LogOutputFile.java Tue Dec  9 21:13:53 2008
@@ -0,0 +1,617 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.sun.jini.outrigger.snaplogstore;
+
+import com.sun.jini.outrigger.LogOps;
+import com.sun.jini.outrigger.OutriggerServerImpl;
+import com.sun.jini.outrigger.StorableObject;
+import com.sun.jini.outrigger.StorableResource;
+import java.io.File;
+import java.io.FileDescriptor;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.RandomAccessFile;
+import java.util.ArrayList;
+import java.util.Observable;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import net.jini.id.Uuid;
+import net.jini.space.InternalSpaceException;
+
+/**
+ * A class to write a log file, to be read later by
+ * <code>LogInputFile</code>.  Each operation on the file is forced to
+ * disk, so when the operation logging function returns, the data is
+ * committed to the log in a recoverable way.
+ * <p>
+ * <code>LogOutputFile</code> cannot extend <code>Observable</code>
+ * because it must extend <code>LogFile</code> (clearly
+ * <code>Observable</code> should have been an interface).  It acts as
+ * an <code>Observable</code> by having a method that returns its
+ * "observable part", which is an object that reports observable
+ * events.  Right now the only observable event is the switching to a
+ * new physical file when the current one becomes full.
+ *
+ * @author Sun Microsystems, Inc.
+ * @see LogInputFile
+ * @see java.util.Observable
+ */
+class LogOutputFile extends LogFile implements LogOps
+{
+    private RandomAccessFile logFile = null;// the current output log file
+    private FileDescriptor logFD;       // the current log file descriptor
+    private ObjectOutputStream out;       // objects written
+    private int suffix;       // the current suffix number
+    private int opCnt;       // number of ops on current file
+    private int maxOps;       // max ops to allow in file
+    private Observable observable;// handle Observer/Observable
+
+    private long logBytes = 0;
+    private final byte[] intBuf = new byte[4];
+    private final byte[] zeroBuf = new byte[4];
+
+    private long deferedUpdateLength = 0;
+    private long deferedPosition = 0;
+
+    private static final long intBytes = 4;
+
+    /**
+     * Logger for logging persistent store related information
+     */
+    private static final Logger logger =
+        Logger.getLogger( OutriggerServerImpl.storeLoggerName );
+
+    /**
+     * Create a <code>LogOutputFile</code> object that will stream
+     * output to a series of files described by <code>basePath</code>,
+     * as interpreted by the relevant <code>LogFile</code>
+     * constructor.  When the file becomes full (the maximum number of
+     * operations is reached), the file is closed and a new file with
+     * the next highest suffix is created.  The
+     * <code>Observable</code> notification for this event passes a
+     * <code>File</code> argument for the filled file as the argument
+     * to <code>Observer</code>.
+     *
+     * @see LogStream#LogStream(String)
+     * @see observable()
+     */
+    LogOutputFile( String basePath, int maxOps ) throws IOException
+    {
+        super( basePath );
+        ArrayList inDir = new ArrayList();
+        suffix = existingLogs( inDir );
+        this.maxOps = maxOps;
+        nextPath();
+    }
+
+    /**
+     * Return an <code>Observable</code> object that represents this object
+     * in the Observer/Observable pattern.
+     *
+     * @see java.util.Observer
+     */
+    Observable observable()
+    {
+        if( observable == null )
+        {         // defer allocation until needed
+            observable = new Observable()
+            {  // we only use this if changed
+
+                public void notifyObservers()
+                {
+                    setChanged();
+                    super.notifyObservers();
+                }
+
+                public void notifyObservers( Object arg )
+                {
+                    setChanged();
+                    super.notifyObservers( arg );
+                }
+            };
+        }
+        return observable;
+    }
+
+    /**
+     * Switch this over to the next path in the list
+     */
+    private void nextPath() throws IOException
+    {
+        boolean completed = false;
+
+        if( logFile != null )
+        {
+
+            // If there was a deferred header, write it out now
+            //
+            if( deferedUpdateLength != 0 )
+            {
+                logFD.sync();        // force the bytes to disk
+                logFile.seek( deferedPosition );
+                writeInt( (int) deferedUpdateLength );
+            }
+            try
+            {
+                close();               // close the stream and the file
+            }
+            catch( IOException ignore )
+            {
+            } // assume this is okay
+            completed = true;
+        }
+
+        suffix++;            // go to next suffix
+        logFile = new RandomAccessFile( baseDir.getPath() + File.separator +
+                                        baseFile + suffix, "rw" );
+        logFD = logFile.getFD();
+        out = new ObjectOutputStream( new LogOutputStream( logFile ) );
+
+        writeInt( LOG_VERSION );
+
+        logBytes = logFile.getFilePointer();
+        logFile.setLength( logBytes );
+
+        // always start out with zero length header for the next update
+        logFile.write( zeroBuf );
+
+        // force length header to disk
+        logFD.sync();
+
+        deferedUpdateLength = 0;
+        opCnt = 0;
+
+        /*
+       * Tell consumer about the completed log.  This is done after the
+       * new one is created so that the old path can be known not
+       * to be the newest (because something newer is there).
+       */
+        if( observable != null && completed )
+        {
+            observable.notifyObservers();
+        }
+    }
+
+    /**
+     * Close the log, but don't remove it.
+     */
+    synchronized void close() throws IOException
+    {
+        if( logFile != null )
+        {
+            try
+            {
+                out.close();
+                logFile.close();
+            }
+            finally
+            {
+                logFile = null;
+            }
+        }
+    }
+
+    /**
+     * Override destroy so we can try to close logFile before calling
+     * super tries to delete all the files.
+     */
+    void destroy()
+    {
+        try
+        {
+            close();
+        }
+        catch( Throwable t )
+        {
+            // Don't let failure keep us from deleting the files we can
+        }
+        super.destroy();
+    }
+
+    /**
+     * Log a server boot.
+     */
+    public synchronized void bootOp( long time, long sessionId )
+    {
+        try
+        {
+            out.writeByte( BOOT_OP );
+            out.writeLong( time );
+            out.writeLong( sessionId );
+            flush();
+        }
+        catch( IOException e )
+        {
+            failed( e );
+        }
+    }
+
+    /**
+     * Log a change in join state
+     */
+    public synchronized void joinStateOp( StorableObject state )
+    {
+        try
+        {
+            out.writeByte( JOINSTATE_OP );
+            out.writeObject( new BaseObject( state ) );
+            flush();
+        }
+        catch( IOException e )
+        {
+            failed( e );
+        }
+    }
+
+    /**
+     * Log a <code>write</code> operation.
+     */
+    public synchronized void writeOp( StorableResource entry, Long txnId )
+    {
+        try
+        {
+            out.writeByte( WRITE_OP );
+            out.writeObject( new Resource( entry ) );
+            out.writeObject( txnId );
+
+            // A write operation under a transaction does not need to be
+            // flushed until it is prepared.
+            //
+            flush( txnId == null );
+        }
+        catch( IOException e )
+        {
+            failed( e );
+        }
+    }
+
+    // Inherit java doc from supertype
+    public synchronized void writeOp( StorableResource entries[], Long txnId )
+    {
+        try
+        {
+            out.writeByte( BATCH_WRITE_OP );
+            out.writeObject( txnId );
+
+            // In the middle of records we need to use the stream's
+            // writeInt, not our private one
+            out.writeInt( entries.length );
+            for( int i = 0; i < entries.length; i++ )
+            {
+                out.writeObject( new Resource( entries[ i ] ) );
+            }
+
+            // A write operation under a transaction does not need to be
+            // flushed until it is prepared.
+            //
+            flush( txnId == null, entries.length );
+        }
+        catch( IOException e )
+        {
+            failed( e );
+        }
+    }
+
+    /**
+     * Log a <code>take</code> operation.
+     */
+    public synchronized void takeOp( Uuid cookie, Long txnId )
+    {
+        try
+        {
+            out.writeByte( TAKE_OP );
+            cookie.write( out );
+            out.writeObject( txnId );
+
+            // A take operation under a transaction does not need to be
+            // flushed until it is prepared.
+            //
+            flush( txnId == null );
+        }
+        catch( IOException e )
+        {
+            failed( e );
+        }
+    }
+
+    // Inherit java doc from supertype
+    public synchronized void takeOp( Uuid cookies[], Long txnId )
+    {
+        try
+        {
+            out.writeByte( BATCH_TAKE_OP );
+            out.writeObject( txnId );
+
+            // In the middle of records we need to use the stream's
+            // writeInt, not our private one
+            out.writeInt( cookies.length );
+            for( int i = 0; i < cookies.length; i++ )
+            {
+                cookies[ i ].write( out );
+            }
+
+            // A take operation under a transaction does not need to be
+            // flushed until it is prepared.
+            //
+            flush( txnId == null, cookies.length );
+        }
+        catch( IOException e )
+        {
+            failed( e );
+        }
+    }
+
+    /**
+     * Log a <code>notify</code> operation.
+     */
+    public synchronized void registerOp( StorableResource registration,
+                                         String type, StorableObject[] templates )
+    {
+        try
+        {
+            out.writeByte( REGISTER_OP );
+            out.writeObject( new Registration( registration, type, templates ) );
+            flush();
+        }
+        catch( IOException e )
+        {
+            failed( e );
+        }
+    }
+
+    /**
+     * Log a <code>renew</code> operation.
+     */
+    public synchronized void renewOp( Uuid cookie, long expiration )
+    {
+        try
+        {
+            out.writeByte( RENEW_OP );
+            cookie.write( out );
+            out.writeLong( expiration );
+            flush();
+        }
+        catch( IOException e )
+        {
+            failed( e );
+        }
+    }
+
+    /**
+     * Log a <code>cancel</code> operation.
+     */
+    public synchronized void cancelOp( Uuid cookie, boolean expired )
+    {
+        try
+        {
+            out.writeByte( CANCEL_OP );
+            cookie.write( out );
+
+            // cancels due to expiration don't need to be flushed
+            // right away
+            flush( !expired );
+        }
+        catch( IOException e )
+        {
+            failed( e );
+        }
+    }
+
+    /**
+     * Log a transaction <code>prepare</code> operation.
+     */
+    public synchronized void prepareOp( Long txnId,
+                                        StorableObject transaction )
+    {
+        try
+        {
+            out.writeByte( PREPARE_OP );
+            out.writeObject( txnId );
+            out.writeObject( new BaseObject( transaction ) );
+            flush();
+        }
+        catch( IOException e )
+        {
+            failed( e );
+        }
+    }
+
+    /**
+     * Log a transaction <code>commit</code> operation.
+     */
+    public synchronized void commitOp( Long txnId )
+    {
+        try
+        {
+            out.writeByte( COMMIT_OP );
+            out.writeObject( txnId );
+            flush();
+        }
+        catch( IOException e )
+        {
+            failed( e );
+        }
+    }
+
+    /**
+     * Log a transaction <code>abort</code> operation.
+     */
+    public synchronized void abortOp( Long txnId )
+    {
+        try
+        {
+            out.writeByte( ABORT_OP );
+            out.writeObject( txnId );
+            flush();
+        }
+        catch( IOException e )
+        {
+            failed( e );
+        }
+    }
+
+    public synchronized void uuidOp( Uuid uuid )
+    {
+        try
+        {
+            out.writeByte( UUID_OP );
+            uuid.write( out );
+            flush();
+        }
+        catch( IOException e )
+        {
+            failed( e );
+        }
+    }
+
+    /**
+     * Flush the current output after an operation.  If the number of
+     * operations is exceeded, shift over to the next path.
+     */
+    private void flush() throws IOException
+    {
+        flush( true );
+    }
+
+
+    /**
+     * Conditionally flush the current output. If the number of
+     * operations is exceeded, shift over to the next path even if
+     * <code>forceToDisk</code> is <code>false</code>.
+     */
+    private synchronized void flush( boolean forceToDisk )
+        throws IOException
+    {
+        flush( forceToDisk, 1 );
+    }
+
+    /**
+     * Conditionally flush the current output. If the number of
+     * operations is exceeded, shift over to the next path even if
+     * <code>forceToDisk</code> is <code>false</code>.
+     */
+    private synchronized void flush( boolean forceToDisk,
+                                     int effectiveOpCount )
+        throws IOException
+    {
+        assert effectiveOpCount > 0;
+
+        out.flush();
+
+        if( forceToDisk )
+        {
+
+            // must force contents to disk before writing real length header
+            logFD.sync();
+        }
+
+        long entryEnd = logFile.getFilePointer();
+        long updateLen = entryEnd - logBytes - intBytes;
+
+        // If we are not forcing to disk, we want to defer the write of the
+        // first header. This will leave a zero just after the last sync'ed
+        // record and will assure that LogInputFile will not read a partially
+        // written record.
+        //
+        if( !forceToDisk )
+        {
+
+            // If this is the first flush(false) we save the header information
+            // and location for later. Otherwise we write out the header
+            // normally.
+            //
+            if( deferedUpdateLength == 0 )
+            {
+                deferedUpdateLength = updateLen;  // save the header length
+                deferedPosition = logBytes;       // and position for later
+            }
+            else
+            {
+                // write real length header
+                logFile.seek( logBytes );
+                writeInt( (int) updateLen );
+            }
+        }
+        else
+        {
+
+            // If there was a deferred header, write that out now and
+            // then write the current header.
+            //
+            if( deferedUpdateLength != 0 )
+            {
+                logFile.seek( deferedPosition );
+                writeInt( (int) deferedUpdateLength );
+                deferedUpdateLength = 0;
+            }
+            // write real length header
+            logFile.seek( logBytes );
+            writeInt( (int) updateLen );
+        }
+
+        // pad out update record so length header does not span disk blocks
+        entryEnd = ( entryEnd + 3 ) & ~3L;
+
+        // write zero length header for next update
+        logFile.seek( entryEnd );
+        logFile.write( zeroBuf );
+        logBytes = entryEnd;
+
+        if( forceToDisk )
+        {
+            logFD.sync();
+        }
+
+        opCnt += effectiveOpCount;
+        if( opCnt >= maxOps )
+        {
+            nextPath();
+        }
+        else
+        {
+            out.reset();        // not critical to flush this
+        }
+    }
+
+    /**
+     * Write an int value in single write operation. Note we only use
+     * this method when writing log file and recored headers.  We
+     * can't use it inside records because the data inside records is
+     * written/read using <code>ObjectIn/OutputStream</code> and this
+     * method writes directly to the <code>RandomAccessFile</code>.
+     *
+     * @param val int value
+     * @throws IOException if any other I/O error occurs
+     */
+    private void writeInt( int val ) throws IOException
+    {
+        intBuf[ 0 ] = (byte) ( val >> 24 );
+        intBuf[ 1 ] = (byte) ( val >> 16 );
+        intBuf[ 2 ] = (byte) ( val >> 8 );
+        intBuf[ 3 ] = (byte) val;
+        logFile.write( intBuf );
+    }
+
+    private void failed( Exception e ) throws InternalSpaceException
+    {
+        logger.log( Level.SEVERE,
+                    "Unexpected I/O error while persisting Space data",
+                    e );
+        System.exit( -5 );
+    }
+}

Added: incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/snaplogstore/LogOutputStream.java
URL: http://svn.apache.org/viewvc/incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/snaplogstore/LogOutputStream.java?rev=724979&view=auto
==============================================================================
--- incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/snaplogstore/LogOutputStream.java (added)
+++ incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/snaplogstore/LogOutputStream.java Tue Dec  9 21:13:53 2008
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.sun.jini.outrigger.snaplogstore;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.RandomAccessFile;
+
+/**
+ * This class extends the functionality of the java.io.OutputStream class
+ * in order to provide an output mechanism that can be used by processes
+ * that perform logging operations; in particular, processes that store
+ * state in order to provide persistence.
+ *
+ * @author Sun Microsystems, Inc.
+ * @see java.io.OutputStream
+ */
+public class LogOutputStream extends OutputStream
+{
+
+    private RandomAccessFile raf;
+
+    /**
+     * Creates an output file with the specified <code>RandomAccessFile</code>
+     *
+     * @param raf the output file
+     * @throws IOException If an I/O error has occurred.
+     */
+    public LogOutputStream( RandomAccessFile raf ) throws IOException
+    {
+        this.raf = raf;
+    }
+
+    /**
+     * Writes a byte of data. This method will block until the byte is
+     * actually written.
+     *
+     * @param b the byte to be written
+     * @throws IOException If an I/O error has occurred.
+     */
+    public void write( int b ) throws IOException
+    {
+        raf.write( b );
+    }
+
+    /**
+     * Writes an array of bytes. Will block until the bytes
+     * are actually written.
+     *
+     * @param b the data to be written
+     * @throws IOException If an I/O error has occurred.
+     */
+    public void write( byte b[] ) throws IOException
+    {
+        raf.write( b );
+    }
+
+    /**
+     * Writes a sub-array of bytes.
+     *
+     * @param b   the data to be written
+     * @param off the start offset in the data
+     * @param len the number of bytes to write
+     * @throws IOException If an I/O error has occurred.
+     */
+    public void write( byte b[], int off, int len ) throws IOException
+    {
+        raf.write( b, off, len );
+    }
+
+    /**
+     * A LogOutputStream cannot be closed, so this does nothing.
+     *
+     * @throws IOException If an I/O error has occurred.
+     */
+    public final void close() throws IOException
+    {
+    }
+
+}

Added: incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/snaplogstore/LogStore.java
URL: http://svn.apache.org/viewvc/incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/snaplogstore/LogStore.java?rev=724979&view=auto
==============================================================================
--- incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/snaplogstore/LogStore.java (added)
+++ incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/snaplogstore/LogStore.java Tue Dec  9 21:13:53 2008
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.sun.jini.outrigger.snaplogstore;
+
+import com.sun.jini.config.Config;
+import com.sun.jini.outrigger.LogOps;
+import com.sun.jini.outrigger.OutriggerServerImpl;
+import com.sun.jini.outrigger.Recover;
+import com.sun.jini.outrigger.Store;
+import com.sun.jini.system.FileSystem;
+import java.io.File;
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import net.jini.config.Configuration;
+import net.jini.config.ConfigurationException;
+import net.jini.space.InternalSpaceException;
+
+/**
+ * @author Sun Microsystems, Inc.
+ * @see com.sun.jini.outrigger.OutriggerServerImpl
+ */
+public class LogStore implements Store
+{
+    private LogOutputFile log;
+    private final String path;
+    private BackEnd be;
+    private int maxOps;
+
+    /**
+     * Logger for logging persistent store related information
+     */
+    private static final Logger logger =
+        Logger.getLogger( OutriggerServerImpl.storeLoggerName );
+
+    /**
+     * Create a new <code>LogStore</code>.
+     *
+     * @param config the directory to use for persistence.
+     */
+    public LogStore( Configuration config ) throws ConfigurationException
+    {
+        path = (String) Config.getNonNullEntry( config,
+                                                OutriggerServerImpl.COMPONENT_NAME,
+                                                OutriggerServerImpl.PERSISTENCE_DIR_CONFIG_ENTRY,
+                                                String.class );
+
+        logger.log( Level.CONFIG, "using directory {0}", path );
+
+        FileSystem.ensureDir( path );
+
+        be = new BackEnd( path );
+
+        maxOps = Config.getIntEntry( config,
+                                     OutriggerServerImpl.COMPONENT_NAME,
+                                     "maxOps",
+                                     1000, 1, Integer.MAX_VALUE );
+    }
+
+    /**
+     * Setup store, recover previous state if any.
+     *
+     * @param space object used for recovery of previous state
+     * @return object used to persist state
+     */
+    public LogOps setupStore( Recover space )
+    {
+        try
+        {
+            be.setupStore( space );
+
+            // Use the log type as the file prefix
+            //
+            log = new LogOutputFile(
+                new File( path, LogFile.LOG_TYPE ).getAbsolutePath(),
+                maxOps );
+
+            log.observable().addObserver( be );
+        }
+        catch( IOException e )
+        {
+            final String msg = "LogStore: log creation failed";
+            final InternalSpaceException ise =
+                new InternalSpaceException( msg, e );
+            logger.log( Level.SEVERE, msg, ise );
+            throw ise;
+        }
+        return log;
+    }
+
+    /**
+     * Destroy everything.
+     */
+    public void destroy() throws IOException
+    {
+        be.destroy();
+        log.destroy();
+        new File( path ).delete();
+    }
+
+    // Inherit from super
+    public void close() throws IOException
+    {
+        be.close();
+        log.close();
+    }
+}



Mime
View raw message