hadoop-hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zs...@apache.org
Subject svn commit: r768120 [5/5] - in /hadoop/hive/trunk: ./ ql/src/java/org/apache/hadoop/hive/ql/util/ ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/ ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/helper/ ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/h...
Date Fri, 24 Apr 2009 01:51:01 GMT
Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/PhysicalRowIdManager.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/PhysicalRowIdManager.java?rev=768120&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/PhysicalRowIdManager.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/PhysicalRowIdManager.java Fri Apr 24 01:50:59 2009
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * JDBM LICENSE v1.00
+ *
+ * Redistribution and use of this software and associated documentation
+ * ("Software"), with or without modification, are permitted provided
+ * that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain copyright
+ *    statements and notices.  Redistributions must also contain a
+ *    copy of this document.
+ *
+ * 2. Redistributions in binary form must reproduce the
+ *    above copyright notice, this list of conditions and the
+ *    following disclaimer in the documentation and/or other
+ *    materials provided with the distribution.
+ *
+ * 3. The name "JDBM" must not be used to endorse or promote
+ *    products derived from this Software without prior written
+ *    permission of Cees de Groot.  For written permission,
+ *    please contact cg@cdegroot.com.
+ *
+ * 4. Products derived from this Software may not be called "JDBM"
+ *    nor may "JDBM" appear in their names without prior written
+ *    permission of Cees de Groot.
+ *
+ * 5. Due credit should be given to the JDBM Project
+ *    (http://jdbm.sourceforge.net/).
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE JDBM PROJECT AND CONTRIBUTORS
+ * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
+ * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
+ * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
+ * CEES DE GROOT OR ANY CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+ * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+ * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
+ * OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Copyright 2000 (C) Cees de Groot. All Rights Reserved.
+ * Contributions are Copyright (C) 2000 by their associated contributors.
+ *
+ * $Id: PhysicalRowIdManager.java,v 1.3 2003/03/21 03:00:09 boisvert Exp $
+ */
+
+package org.apache.hadoop.hive.ql.util.jdbm.recman;
+
+import java.io.IOException;
+
+/**
+ *  This class manages physical row ids, and their data.
+ */
+final class PhysicalRowIdManager
+{
+
+    // The file we're talking to and the associated page manager.
+    private RecordFile file;
+    private PageManager pageman;
+    private FreePhysicalRowIdPageManager freeman;
+
+    /**
+     *  Creates a new rowid manager using the indicated record file.
+     *  and page manager.
+     */
+    PhysicalRowIdManager( RecordFile file, PageManager pageManager )
+        throws IOException
+    {
+        this.file = file;
+        this.pageman = pageManager;
+        this.freeman = new FreePhysicalRowIdPageManager(file, pageman);
+    }
+
+    /**
+     *  Inserts a new record. Returns the new physical rowid.
+     */
+    Location insert( byte[] data, int start, int length )
+        throws IOException
+    {
+        Location retval = alloc( length );
+        write( retval, data, start, length );
+        return retval;
+    }
+
+    /**
+     *  Updates an existing record. Returns the possibly changed
+     *  physical rowid.
+     */
+    Location update( Location rowid, byte[] data, int start, int length )
+        throws IOException
+    {
+        // fetch the record header
+        BlockIo block = file.get( rowid.getBlock() );
+        RecordHeader head = new RecordHeader( block, rowid.getOffset() );
+        if ( length > head.getAvailableSize() ) {
+            // not enough space - we need to copy to a new rowid.
+            file.release( block );
+            free( rowid );
+            rowid = alloc( length );
+        } else {
+            file.release( block );
+        }
+
+        // 'nuff space, write it in and return the rowid.
+        write( rowid, data, start, length );
+        return rowid;
+    }
+
+    /**
+     *  Deletes a record.
+     */
+    void delete( Location rowid )
+        throws IOException
+    {
+        free( rowid );
+    }
+
+    /**
+     *  Retrieves a record.
+     */
+    byte[] fetch( Location rowid )
+        throws IOException 
+    {
+        // fetch the record header
+        PageCursor curs = new PageCursor( pageman, rowid.getBlock() );
+        BlockIo block = file.get( curs.getCurrent() );
+        RecordHeader head = new RecordHeader( block, rowid.getOffset() );
+
+        // allocate a return buffer
+        byte[] retval = new byte[ head.getCurrentSize() ];
+        if ( retval.length == 0 ) {
+            file.release( curs.getCurrent(), false );
+            return retval;
+        }
+
+        // copy bytes in
+        int offsetInBuffer = 0;
+        int leftToRead = retval.length;
+        short dataOffset = (short) (rowid.getOffset() + RecordHeader.SIZE);
+        while ( leftToRead > 0 ) {
+            // copy current page's data to return buffer
+            int toCopy = RecordFile.BLOCK_SIZE - dataOffset;
+            if ( leftToRead < toCopy ) {
+                toCopy = leftToRead;
+            }
+            System.arraycopy( block.getData(), dataOffset,
+                              retval, offsetInBuffer,
+                              toCopy );
+
+            // Go to the next block
+            leftToRead -= toCopy;
+            offsetInBuffer += toCopy;
+
+            file.release( block );
+
+            if ( leftToRead > 0 ) {
+                block = file.get( curs.next() );
+                dataOffset = DataPage.O_DATA;
+            }
+
+        }
+
+        return retval;
+    }
+
+    /**
+     *  Allocate a new rowid with the indicated size.
+     */
+    private Location alloc( int size )
+        throws IOException
+    {
+        Location retval = freeman.get( size );
+        if ( retval == null ) {
+            retval = allocNew( size, pageman.getLast( Magic.USED_PAGE ) );
+        }
+        return retval;
+    }
+
+    /**
+     *  Allocates a new rowid. The second parameter is there to
+     *  allow for a recursive call - it indicates where the search
+     *  should start.
+     */
+    private Location allocNew( int size, long start )
+        throws IOException
+    {
+        BlockIo curBlock;
+        DataPage curPage;
+        if ( start == 0 ) {
+            // we need to create a new page.
+            start = pageman.allocate( Magic.USED_PAGE );
+            curBlock = file.get( start );
+            curPage = DataPage.getDataPageView( curBlock );
+            curPage.setFirst( DataPage.O_DATA );
+            RecordHeader hdr = new RecordHeader( curBlock, DataPage.O_DATA );
+            hdr.setAvailableSize( 0 );
+            hdr.setCurrentSize( 0 );
+        } else {
+            curBlock = file.get( start );
+            curPage = DataPage.getDataPageView( curBlock );
+        }
+
+        // follow the rowids on this page to get to the last one. We don't
+        // fall off, because this is the last page, remember?
+        short pos = curPage.getFirst();
+        if ( pos == 0 ) {
+            // page is exactly filled by the last block of a record
+            file.release( curBlock );
+            return allocNew( size, 0 );
+        }
+
+        RecordHeader hdr = new RecordHeader( curBlock, pos );
+        while ( hdr.getAvailableSize() != 0 && pos < RecordFile.BLOCK_SIZE ) {
+            pos += hdr.getAvailableSize() + RecordHeader.SIZE;
+            if ( pos == RecordFile.BLOCK_SIZE ) {
+                // Again, a filled page.
+                file.release( curBlock );
+                return allocNew( size, 0 );
+            }
+
+            hdr = new RecordHeader( curBlock, pos );
+        }
+
+        if ( pos == RecordHeader.SIZE ) {
+            // the last record exactly filled the page. Restart forcing
+            // a new page.
+            file.release( curBlock );
+        }
+
+        // we have the position, now tack on extra pages until we've got
+        // enough space.
+        Location retval = new Location( start, pos );
+        int freeHere = RecordFile.BLOCK_SIZE - pos - RecordHeader.SIZE;
+        if ( freeHere < size ) {
+            // check whether the last page would have only a small bit left.
+            // if yes, increase the allocation. A small bit is a record
+            // header plus 16 bytes.
+            int lastSize = (size - freeHere) % DataPage.DATA_PER_PAGE;
+            if (( DataPage.DATA_PER_PAGE - lastSize ) < (RecordHeader.SIZE + 16) ) {
+                size += (DataPage.DATA_PER_PAGE - lastSize);
+            }
+
+            // write out the header now so we don't have to come back.
+            hdr.setAvailableSize( size );
+            file.release( start, true );
+
+            int neededLeft = size - freeHere;
+            // Refactor these two blocks!
+            while ( neededLeft >= DataPage.DATA_PER_PAGE ) {
+                start = pageman.allocate( Magic.USED_PAGE );
+                curBlock = file.get( start );
+                curPage = DataPage.getDataPageView( curBlock );
+                curPage.setFirst( (short) 0 ); // no rowids, just data
+                file.release( start, true );
+                neededLeft -= DataPage.DATA_PER_PAGE;
+            }
+            if ( neededLeft > 0 ) {
+                // done with whole chunks, allocate last fragment.
+                start = pageman.allocate( Magic.USED_PAGE );
+                curBlock = file.get( start );
+                curPage = DataPage.getDataPageView( curBlock );
+                curPage.setFirst( (short) (DataPage.O_DATA + neededLeft) );
+                file.release( start, true );
+            }
+        } else {
+            // just update the current page. If there's less than 16 bytes
+            // left, we increase the allocation (16 bytes is an arbitrary
+            // number).
+            if ( freeHere - size <= (16 + RecordHeader.SIZE) ) {
+                size = freeHere;
+            }
+            hdr.setAvailableSize( size );
+            file.release( start, true );
+        }
+        return retval;
+
+    }
+
+
+    private void free( Location id )
+        throws IOException
+    {
+        // get the rowid, and write a zero current size into it.
+        BlockIo curBlock = file.get( id.getBlock() );
+        DataPage curPage = DataPage.getDataPageView( curBlock );
+        RecordHeader hdr = new RecordHeader( curBlock, id.getOffset() );
+        hdr.setCurrentSize( 0 );
+        file.release( id.getBlock(), true );
+
+        // write the rowid to the free list
+        freeman.put( id, hdr.getAvailableSize() );
+    }
+
+    /**
+     *  Writes out data to a rowid. Assumes that any resizing has been
+     *  done.
+     */
+    private void write(Location rowid, byte[] data, int start, int length )
+        throws IOException
+    {
+        PageCursor curs = new PageCursor( pageman, rowid.getBlock() );
+        BlockIo block = file.get( curs.getCurrent() );
+        RecordHeader hdr = new RecordHeader( block, rowid.getOffset() );
+        hdr.setCurrentSize( length );
+        if ( length == 0 ) {
+            file.release( curs.getCurrent(), true );
+            return;
+        }
+
+        // copy bytes in
+        int offsetInBuffer = start;
+        int leftToWrite = length;
+        short dataOffset = (short) (rowid.getOffset() + RecordHeader.SIZE);
+        while ( leftToWrite > 0 ) {
+            // copy current page's data to return buffer
+            int toCopy = RecordFile.BLOCK_SIZE - dataOffset;
+
+            if ( leftToWrite < toCopy ) {
+                toCopy = leftToWrite;
+            }
+            System.arraycopy( data, offsetInBuffer, block.getData(), 
+                              dataOffset, toCopy );
+
+            // Go to the next block
+            leftToWrite -= toCopy;
+            offsetInBuffer += toCopy;
+
+            file.release( curs.getCurrent(), true );
+
+            if ( leftToWrite > 0 ) {
+                block = file.get( curs.next() );
+                dataOffset = DataPage.O_DATA;
+            }
+        }
+    }
+}
+

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/Provider.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/Provider.java?rev=768120&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/Provider.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/Provider.java Fri Apr 24 01:50:59 2009
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * JDBM LICENSE v1.00
+ *
+ * Redistribution and use of this software and associated documentation
+ * ("Software"), with or without modification, are permitted provided
+ * that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain copyright
+ *    statements and notices.  Redistributions must also contain a
+ *    copy of this document.
+ *
+ * 2. Redistributions in binary form must reproduce the
+ *    above copyright notice, this list of conditions and the
+ *    following disclaimer in the documentation and/or other
+ *    materials provided with the distribution.
+ *
+ * 3. The name "JDBM" must not be used to endorse or promote
+ *    products derived from this Software without prior written
+ *    permission of Cees de Groot.  For written permission,
+ *    please contact cg@cdegroot.com.
+ *
+ * 4. Products derived from this Software may not be called "JDBM"
+ *    nor may "JDBM" appear in their names without prior written
+ *    permission of Cees de Groot.
+ *
+ * 5. Due credit should be given to the JDBM Project
+ *    (http://jdbm.sourceforge.net/).
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE JDBM PROJECT AND CONTRIBUTORS
+ * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
+ * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
+ * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
+ * CEES DE GROOT OR ANY CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+ * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+ * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
+ * OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Copyright 2000 (C) Cees de Groot. All Rights Reserved.
+ * Copyright 2000-2001 (C) Alex Boisvert. All Rights Reserved.
+ * Contributions are Copyright (C) 2000 by their associated contributors.
+ *
+ * $Id: Provider.java,v 1.3 2005/06/25 23:12:32 doomdark Exp $
+ */
+
+package org.apache.hadoop.hive.ql.util.jdbm.recman;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.hadoop.hive.ql.util.jdbm.RecordManager;
+import org.apache.hadoop.hive.ql.util.jdbm.RecordManagerOptions;
+import org.apache.hadoop.hive.ql.util.jdbm.RecordManagerProvider;
+
+import org.apache.hadoop.hive.ql.util.jdbm.helper.MRU;
+
+/**
+ * Provider of the default RecordManager implementation.
+ *
+ * @author <a href="mailto:boisvert@intalio.com">Alex Boisvert</a>
+ * @version $Id: Provider.java,v 1.3 2005/06/25 23:12:32 doomdark Exp $
+ */
+public final class Provider
+    implements RecordManagerProvider
+{
+
+    /**
+     * Create a default implementation record manager.
+     *
+     * @param name Name of the record file.
+     * @param options Record manager options.
+     * @throws IOException if an I/O related exception occurs while creating
+     *                    or opening the record manager.
+     * @throws UnsupportedOperationException if some options are not supported by the
+     *                                      implementation.
+     * @throws IllegalArgumentException if some options are invalid.
+     */
+    public RecordManager createRecordManager( String name,
+                                              Properties options )
+        throws IOException
+    {
+        RecordManager  recman;
+        String         value;
+        int            cacheSize;
+
+        recman = new BaseRecordManager( name );
+
+        value = options.getProperty( RecordManagerOptions.DISABLE_TRANSACTIONS, "false" );
+        if ( value.equalsIgnoreCase( "TRUE" ) ) {
+            ( (BaseRecordManager) recman ).disableTransactions();
+        }
+
+        value = options.getProperty( RecordManagerOptions.CACHE_SIZE, "1000" );
+        cacheSize = Integer.parseInt( value );
+
+        value = options.getProperty( RecordManagerOptions.CACHE_TYPE,
+                                     RecordManagerOptions.NORMAL_CACHE );
+        if ( value.equalsIgnoreCase( RecordManagerOptions.NORMAL_CACHE ) ) {
+            MRU cache = new MRU( cacheSize );
+            recman = new CacheRecordManager( recman, cache );
+        } else if ( value.equalsIgnoreCase( RecordManagerOptions.SOFT_REF_CACHE ) ) {
+            throw new IllegalArgumentException( "Soft reference cache not implemented" );
+        } else if ( value.equalsIgnoreCase( RecordManagerOptions.WEAK_REF_CACHE ) ) {
+            throw new IllegalArgumentException( "Weak reference cache not implemented" );
+        } else {
+            throw new IllegalArgumentException( "Invalid cache type: " + value );
+        }
+
+        return recman;
+    }
+
+
+}

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/RecordCache.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/RecordCache.java?rev=768120&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/RecordCache.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/RecordCache.java Fri Apr 24 01:50:59 2009
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * JDBM LICENSE v1.00
+ *
+ * Redistribution and use of this software and associated documentation
+ * ("Software"), with or without modification, are permitted provided
+ * that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain copyright
+ *    statements and notices.  Redistributions must also contain a
+ *    copy of this document.
+ *
+ * 2. Redistributions in binary form must reproduce the
+ *    above copyright notice, this list of conditions and the
+ *    following disclaimer in the documentation and/or other
+ *    materials provided with the distribution.
+ *
+ * 3. The name "JDBM" must not be used to endorse or promote
+ *    products derived from this Software without prior written
+ *    permission of Cees de Groot.  For written permission,
+ *    please contact cg@cdegroot.com.
+ *
+ * 4. Products derived from this Software may not be called "JDBM"
+ *    nor may "JDBM" appear in their names without prior written
+ *    permission of Cees de Groot. 
+ *
+ * 5. Due credit should be given to the JDBM Project
+ *    (http://jdbm.sourceforge.net/).
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE JDBM PROJECT AND CONTRIBUTORS
+ * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
+ * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
+ * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
+ * CEES DE GROOT OR ANY CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+ * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+ * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
+ * OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Copyright 2000 (C) Cees de Groot. All Rights Reserved.
+ * Contributions are Copyright (C) 2000 by their associated contributors.
+ *
+ * $Id: RecordCache.java,v 1.2 2005/06/25 23:12:32 doomdark Exp $
+ */
+
+package org.apache.hadoop.hive.ql.util.jdbm.recman;
+
+import java.io.IOException;
+
+/**
+ *  This interface is used for synchronization.
+ *  <p>
+ *  RecordManager ensures that the cache has the up-to-date information
+ *  by way of an invalidation protocol.
+ */
+public interface RecordCache {
+
+    /**
+     * Notification to flush content related to a given record.
+     */
+    public void flush(long recid) throws IOException;
+
+    /**
+     * Notification to flush data all of records.
+     */
+    public void flushAll() throws IOException;
+
+    /**
+     * Notification to invalidate content related to given record.
+     */
+    public void invalidate(long recid) throws IOException;
+
+    /**
+     * Notification to invalidate content of all records.
+     */
+    public void invalidateAll() throws IOException;
+
+}

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/RecordFile.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/RecordFile.java?rev=768120&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/RecordFile.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/RecordFile.java Fri Apr 24 01:50:59 2009
@@ -0,0 +1,430 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * JDBM LICENSE v1.00
+ *
+ * Redistribution and use of this software and associated documentation
+ * ("Software"), with or without modification, are permitted provided
+ * that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain copyright
+ *    statements and notices.  Redistributions must also contain a
+ *    copy of this document.
+ *
+ * 2. Redistributions in binary form must reproduce the
+ *    above copyright notice, this list of conditions and the
+ *    following disclaimer in the documentation and/or other
+ *    materials provided with the distribution.
+ *
+ * 3. The name "JDBM" must not be used to endorse or promote
+ *    products derived from this Software without prior written
+ *    permission of Cees de Groot.  For written permission,
+ *    please contact cg@cdegroot.com.
+ *
+ * 4. Products derived from this Software may not be called "JDBM"
+ *    nor may "JDBM" appear in their names without prior written
+ *    permission of Cees de Groot.
+ *
+ * 5. Due credit should be given to the JDBM Project
+ *    (http://jdbm.sourceforge.net/).
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE JDBM PROJECT AND CONTRIBUTORS
+ * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
+ * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
+ * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
+ * CEES DE GROOT OR ANY CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+ * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+ * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
+ * OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Copyright 2000 (C) Cees de Groot. All Rights Reserved.
+ * Contributions are Copyright (C) 2000 by their associated contributors.
+ *
+ * $Id: RecordFile.java,v 1.6 2005/06/25 23:12:32 doomdark Exp $
+ */
+
+package org.apache.hadoop.hive.ql.util.jdbm.recman;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ *  This class represents a random access file as a set of fixed size
+ *  records. Each record has a physical record number, and records are
+ *  cached in order to improve access.
+ *<p>
+ *  The set of dirty records on the in-use list constitutes a transaction.
+ *  Later on, we will send these records to some recovery thingy.
+ */
+public final class RecordFile {
+    final TransactionManager txnMgr;
+
+    // Todo: reorganize in hashes and fifos as necessary.
+    // free -> inUse -> dirty -> inTxn -> free
+    // free is a cache, thus a FIFO. The rest are hashes.
+    private final LinkedList free = new LinkedList();
+    private final HashMap inUse = new HashMap();
+    private final HashMap dirty = new HashMap();
+    private final HashMap inTxn = new HashMap();
+
+    // transactions disabled?
+    private boolean transactionsDisabled = false;
+
+    /** The length of a single block. */
+    public final static int BLOCK_SIZE = 8192;//4096;
+
+    /** The extension of a record file */
+    final static String extension = ".db";
+
+    /** A block of clean data to wipe clean pages. */
+    final static byte[] cleanData = new byte[BLOCK_SIZE];
+
+    private RandomAccessFile file;
+    private final String fileName;
+
+    /**
+     *  Creates a new object on the indicated filename. The file is
+     *  opened in read/write mode.
+     *
+     *  @param fileName the name of the file to open or create, without
+     *         an extension.
+     *  @throws IOException whenever the creation of the underlying
+     *          RandomAccessFile throws it.
+     */
+    RecordFile(String fileName) throws IOException {
+        this.fileName = fileName;
+        file = new RandomAccessFile(fileName + extension, "rw");
+        txnMgr = new TransactionManager(this);
+    }
+
+    /**
+     *  Returns the file name.
+     */
+    String getFileName() {
+        return fileName;
+    }
+
+    /**
+     *  Disables transactions: doesn't sync and doesn't use the
+     *  transaction manager.
+     */
+    void disableTransactions() {
+        transactionsDisabled = true;
+    }
+
+    /**
+     *  Gets a block from the file. The returned byte array is
+     *  the in-memory copy of the record, and thus can be written
+     *  (and subsequently released with a dirty flag in order to
+     *  write the block back).
+     *
+     *  @param blockid The record number to retrieve.
+     */
+     BlockIo get(long blockid) throws IOException {
+         Long key = new Long(blockid);
+
+         // try in transaction list, dirty list, free list
+         BlockIo node = (BlockIo) inTxn.get(key);
+         if (node != null) {
+             inTxn.remove(key);
+             inUse.put(key, node);
+             return node;
+         }
+         node = (BlockIo) dirty.get(key);
+         if (node != null) {
+             dirty.remove(key);
+             inUse.put(key, node);
+             return node;
+         }
+         for (Iterator i = free.iterator(); i.hasNext(); ) {
+             BlockIo cur = (BlockIo) i.next();
+             if (cur.getBlockId() == blockid) {
+                 node = cur;
+                 i.remove();
+                 inUse.put(key, node);
+                 return node;
+             }
+         }
+
+         // sanity check: can't be on in use list
+         if (inUse.get(key) != null) {
+             throw new Error("double get for block " + blockid);
+         }
+
+         // get a new node and read it from the file
+         node = getNewNode(blockid);
+         long offset = blockid * BLOCK_SIZE;
+         if (file.length() > 0 && offset <= file.length()) {
+             read(file, offset, node.getData(), BLOCK_SIZE);
+         } else {
+             System.arraycopy(cleanData, 0, node.getData(), 0, BLOCK_SIZE);
+         }
+         inUse.put(key, node);
+         node.setClean();
+         return node;
+     }
+
+
+    /**
+     *  Releases a block.
+     *
+     *  @param blockid The record number to release.
+     *  @param isDirty If true, the block was modified since the get().
+     */
+    void release(long blockid, boolean isDirty)
+    throws IOException {
+        BlockIo node = (BlockIo) inUse.get(new Long(blockid));
+        if (node == null)
+            throw new IOException("bad blockid " + blockid + " on release");
+        if (!node.isDirty() && isDirty)
+            node.setDirty();
+        release(node);
+    }
+
+    /**
+     *  Releases a block.
+     *
+     *  @param block The block to release.
+     */
+    void release(BlockIo block) {
+        Long key = new Long(block.getBlockId());
+        inUse.remove(key);
+        if (block.isDirty()) {
+            // System.out.println( "Dirty: " + key + block );
+            dirty.put(key, block);
+        } else {
+            if (!transactionsDisabled && block.isInTransaction()) {
+                inTxn.put(key, block);
+            } else {
+                free.add(block);
+            }
+        }
+    }
+
+    /**
+     *  Discards a block (will not write the block even if it's dirty)
+     *
+     *  @param block The block to discard.
+     */
+    void discard(BlockIo block) {
+        Long key = new Long(block.getBlockId());
+        inUse.remove(key);
+
+        // note: block not added to free list on purpose, because
+        //       it's considered invalid
+    }
+
+    /**
+     *  Commits the current transaction by flushing all dirty buffers
+     *  to disk.
+     */
+    void commit() throws IOException {
+        // debugging...
+        if (!inUse.isEmpty() && inUse.size() > 1) {
+            showList(inUse.values().iterator());
+            throw new Error("in use list not empty at commit time ("
+                            + inUse.size() + ")");
+        }
+
+        //  System.out.println("committing...");
+
+        if ( dirty.size() == 0 ) {
+            // if no dirty blocks, skip commit process
+            return;
+        }
+
+        if (!transactionsDisabled) {
+            txnMgr.start();
+        }
+
+        for (Iterator i = dirty.values().iterator(); i.hasNext(); ) {
+            BlockIo node = (BlockIo) i.next();
+            i.remove();
+            // System.out.println("node " + node + " map size now " + dirty.size());
+            if (transactionsDisabled) {
+                long offset = node.getBlockId() * BLOCK_SIZE;
+                file.seek(offset);
+                file.write(node.getData());
+                node.setClean();
+                free.add(node);
+            }
+            else {
+                txnMgr.add(node);
+                inTxn.put(new Long(node.getBlockId()), node);
+            }
+        }
+        if (!transactionsDisabled) {
+            txnMgr.commit();
+        }
+    }
+
+    /**
+     *  Rollback the current transaction by discarding all dirty buffers
+     */
+    void rollback() throws IOException {
+        // debugging...
+        if (!inUse.isEmpty()) {
+            showList(inUse.values().iterator());
+            throw new Error("in use list not empty at rollback time ("
+                            + inUse.size() + ")");
+        }
+        //  System.out.println("rollback...");
+        dirty.clear();
+
+        txnMgr.synchronizeLogFromDisk();
+
+        if (!inTxn.isEmpty()) {
+            showList(inTxn.values().iterator());
+            throw new Error("in txn list not empty at rollback time ("
+                            + inTxn.size() + ")");
+        };
+    }
+
+    /**
+     *  Commits and closes file.
+     */
+    void close() throws IOException {
+        if (!dirty.isEmpty()) {
+            commit();
+        }
+        txnMgr.shutdown();
+
+        if (!inTxn.isEmpty()) {
+            showList(inTxn.values().iterator());
+            throw new Error("In transaction not empty");
+        }
+
+        // these actually ain't that bad in a production release
+        if (!dirty.isEmpty()) {
+            System.out.println("ERROR: dirty blocks at close time");
+            showList(dirty.values().iterator());
+            throw new Error("Dirty blocks at close time");
+        }
+        if (!inUse.isEmpty()) {
+            System.out.println("ERROR: inUse blocks at close time");
+            showList(inUse.values().iterator());
+            throw new Error("inUse blocks at close time");
+        }
+
+        // debugging stuff to keep an eye on the free list
+        // System.out.println("Free list size:" + free.size());
+        file.close();
+        file = null;
+    }
+
+
+    /**
+     * Force closing the file and underlying transaction manager.
+     * Used for testing purposed only.
+     */
+    void forceClose() throws IOException {
+      txnMgr.forceClose();
+      file.close();
+    }
+
+    /**
+     *  Prints contents of a list
+     */
+    private void showList(Iterator i) {
+        int cnt = 0;
+        while (i.hasNext()) {
+            System.out.println("elem " + cnt + ": " + i.next());
+            cnt++;
+        }
+    }
+
+
+    /**
+     *  Returns a new node. The node is retrieved (and removed)
+     *  from the released list or created new.
+     */
+    private BlockIo getNewNode(long blockid)
+    throws IOException {
+
+        BlockIo retval = null;
+        if (!free.isEmpty()) {
+            retval = (BlockIo) free.removeFirst();
+        }
+        if (retval == null)
+            retval = new BlockIo(0, new byte[BLOCK_SIZE]);
+
+        retval.setBlockId(blockid);
+        retval.setView(null);
+        return retval;
+    }
+
+    /**
+     *  Synchs a node to disk. This is called by the transaction manager's
+     *  synchronization code.
+     */
+    void synch(BlockIo node) throws IOException {
+        byte[] data = node.getData();
+        if (data != null) {
+            long offset = node.getBlockId() * BLOCK_SIZE;
+            file.seek(offset);
+            file.write(data);
+        }
+    }
+
+    /**
+     *  Releases a node from the transaction list, if it was sitting
+     *  there.
+     *
+     *  @param recycle true if block data can be reused
+     */
+    void releaseFromTransaction(BlockIo node, boolean recycle)
+    throws IOException {
+        Long key = new Long(node.getBlockId());
+        if ((inTxn.remove(key) != null) && recycle) {
+            free.add(node);
+        }
+    }
+
+    /**
+     *  Synchronizes the file.
+     */
+    void sync() throws IOException {
+        file.getFD().sync();
+    }
+
+
+    /**
+     * Utility method: Read a block from a RandomAccessFile
+     */
+    private static void read(RandomAccessFile file, long offset,
+                             byte[] buffer, int nBytes) throws IOException {
+        file.seek(offset);
+        int remaining = nBytes;
+        int pos = 0;
+        while (remaining > 0) {
+            int read = file.read(buffer, pos, remaining);
+            if (read == -1) {
+                System.arraycopy(cleanData, 0, buffer, pos, remaining);
+                break;
+            }
+            remaining -= read;
+            pos += read;
+        }
+    }
+
+}

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/RecordHeader.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/RecordHeader.java?rev=768120&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/RecordHeader.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/RecordHeader.java Fri Apr 24 01:50:59 2009
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * JDBM LICENSE v1.00
+ *
+ * Redistribution and use of this software and associated documentation
+ * ("Software"), with or without modification, are permitted provided
+ * that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain copyright
+ *    statements and notices.  Redistributions must also contain a
+ *    copy of this document.
+ *
+ * 2. Redistributions in binary form must reproduce the
+ *    above copyright notice, this list of conditions and the
+ *    following disclaimer in the documentation and/or other
+ *    materials provided with the distribution.
+ *
+ * 3. The name "JDBM" must not be used to endorse or promote
+ *    products derived from this Software without prior written
+ *    permission of Cees de Groot.  For written permission,
+ *    please contact cg@cdegroot.com.
+ *
+ * 4. Products derived from this Software may not be called "JDBM"
+ *    nor may "JDBM" appear in their names without prior written
+ *    permission of Cees de Groot. 
+ *
+ * 5. Due credit should be given to the JDBM Project
+ *    (http://jdbm.sourceforge.net/).
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE JDBM PROJECT AND CONTRIBUTORS
+ * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
+ * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
+ * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
+ * CEES DE GROOT OR ANY CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+ * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+ * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
+ * OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Copyright 2000 (C) Cees de Groot. All Rights Reserved.
+ * Contributions are Copyright (C) 2000 by their associated contributors.
+ *
+ * $Id: RecordHeader.java,v 1.1 2000/05/06 00:00:31 boisvert Exp $
+ */
+
+package org.apache.hadoop.hive.ql.util.jdbm.recman;
+
+/**
+ *  The data that comes at the start of a record of data. It stores 
+ *  both the current size and the avaliable size for the record - the latter
+ *  can be bigger than the former, which allows the record to grow without
+ *  needing to be moved and which allows the system to put small records
+ *  in larger free spots.
+ */
+class RecordHeader {
+    // offsets
+    private static final short O_CURRENTSIZE = 0; // int currentSize
+    private static final short O_AVAILABLESIZE = Magic.SZ_INT; // int availableSize
+    static final int SIZE = O_AVAILABLESIZE + Magic.SZ_INT;
+    
+    // my block and the position within the block
+    private BlockIo block;
+    private short pos;
+
+    /**
+     *  Constructs a record header from the indicated data starting at
+     *  the indicated position.
+     */
+    RecordHeader(BlockIo block, short pos) {
+        this.block = block;
+        this.pos = pos;
+        if (pos > (RecordFile.BLOCK_SIZE - SIZE))
+            throw new Error("Offset too large for record header (" 
+                            + block.getBlockId() + ":" 
+                            + pos + ")");
+    }
+
+    /** Returns the current size */
+    int getCurrentSize() {
+        return block.readInt(pos + O_CURRENTSIZE);
+    }
+    
+    /** Sets the current size */
+    void setCurrentSize(int value) {
+        block.writeInt(pos + O_CURRENTSIZE, value);
+    }
+    
+    /** Returns the available size */
+    int getAvailableSize() {
+        return block.readInt(pos + O_AVAILABLESIZE);
+    }
+    
+    /** Sets the available size */
+    void setAvailableSize(int value) {
+        block.writeInt(pos + O_AVAILABLESIZE, value);
+    }
+
+    // overrides java.lang.Object
+    public String toString() {
+        return "RH(" + block.getBlockId() + ":" + pos 
+            + ", avl=" + getAvailableSize()
+            + ", cur=" + getCurrentSize() 
+            + ")";
+    }
+}

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/TransactionManager.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/TransactionManager.java?rev=768120&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/TransactionManager.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/TransactionManager.java Fri Apr 24 01:50:59 2009
@@ -0,0 +1,427 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * JDBM LICENSE v1.00
+ *
+ * Redistribution and use of this software and associated documentation
+ * ("Software"), with or without modification, are permitted provided
+ * that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain copyright
+ *    statements and notices.  Redistributions must also contain a
+ *    copy of this document.
+ *
+ * 2. Redistributions in binary form must reproduce the
+ *    above copyright notice, this list of conditions and the
+ *    following disclaimer in the documentation and/or other
+ *    materials provided with the distribution.
+ *
+ * 3. The name "JDBM" must not be used to endorse or promote
+ *    products derived from this Software without prior written
+ *    permission of Cees de Groot.  For written permission,
+ *    please contact cg@cdegroot.com.
+ *
+ * 4. Products derived from this Software may not be called "JDBM"
+ *    nor may "JDBM" appear in their names without prior written
+ *    permission of Cees de Groot.
+ *
+ * 5. Due credit should be given to the JDBM Project
+ *    (http://jdbm.sourceforge.net/).
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE JDBM PROJECT AND CONTRIBUTORS
+ * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
+ * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
+ * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
+ * CEES DE GROOT OR ANY CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+ * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+ * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
+ * OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Copyright 2000 (C) Cees de Groot. All Rights Reserved.
+ * Contributions are Copyright (C) 2000 by their associated contributors.
+ *
+ * $Id: TransactionManager.java,v 1.7 2005/06/25 23:12:32 doomdark Exp $
+ */
+
+package org.apache.hadoop.hive.ql.util.jdbm.recman;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ *  This class manages the transaction log that belongs to every
+ *  {@link RecordFile}. The transaction log is either clean, or
+ *  in progress. In the latter case, the transaction manager
+ *  takes care of a roll forward.
+ *<p>
+ *  Implementation note: this is a proof-of-concept implementation
+ *  which hasn't been optimized for speed. For instance, all sorts
+ *  of streams are created for every transaction.
+ */
+// TODO: Handle the case where we are recovering lg9 and lg0, were we
+// should start with lg9 instead of lg0!
+
+public final class TransactionManager {
+    private RecordFile owner;
+
+    // streams for transaction log.
+    private FileOutputStream fos;
+    private ObjectOutputStream oos;
+
+    /** 
+     * By default, we keep 10 transactions in the log file before
+     * synchronizing it with the main database file.
+     */
+    static final int DEFAULT_TXNS_IN_LOG = 10;
+
+    /** 
+     * Maximum number of transactions before the log file is
+     * synchronized with the main database file.
+     */
+    private int _maxTxns = DEFAULT_TXNS_IN_LOG;
+
+    /**
+     * In-core copy of transactions. We could read everything back from
+     * the log file, but the RecordFile needs to keep the dirty blocks in
+     * core anyway, so we might as well point to them and spare us a lot
+     * of hassle.
+     */
+    private ArrayList[] txns = new ArrayList[DEFAULT_TXNS_IN_LOG];
+    private int curTxn = -1;
+
+    /** Extension of a log file. */
+    static final String extension = ".lg";
+
+    /**
+     *  Instantiates a transaction manager instance. If recovery
+     *  needs to be performed, it is done.
+     *
+     *  @param owner the RecordFile instance that owns this transaction mgr.
+     */
+    TransactionManager(RecordFile owner) throws IOException {
+        this.owner = owner;
+        recover();
+        open();
+    }
+
+    
+    /**
+     * Synchronize log file data with the main database file.
+     * <p>
+     * After this call, the main database file is guaranteed to be 
+     * consistent and guaranteed to be the only file needed for 
+     * backup purposes.
+     */
+    public void synchronizeLog()
+        throws IOException
+    {
+        synchronizeLogFromMemory();
+    }
+
+    
+    /**
+     * Set the maximum number of transactions to record in
+     * the log (and keep in memory) before the log is
+     * synchronized with the main database file.
+     * <p>
+     * This method must be called while there are no
+     * pending transactions in the log.
+     */
+    public void setMaximumTransactionsInLog( int maxTxns )
+        throws IOException
+    {
+        if ( maxTxns <= 0 ) {
+            throw new IllegalArgumentException( 
+                "Argument 'maxTxns' must be greater than 0." );
+        }
+        if ( curTxn != -1 ) {
+            throw new IllegalStateException( 
+                "Cannot change setting while transactions are pending in the log" );
+        }
+        _maxTxns = maxTxns;
+        txns = new ArrayList[ maxTxns ];
+    }
+
+    
+    /** Builds logfile name  */
+    private String makeLogName() {
+        return owner.getFileName() + extension;
+    }
+
+
+    /** Synchs in-core transactions to data file and opens a fresh log */
+    private void synchronizeLogFromMemory() throws IOException {
+        close();
+
+        TreeSet blockList = new TreeSet( new BlockIoComparator() );
+
+        int numBlocks = 0;
+        int writtenBlocks = 0;
+        for (int i = 0; i < _maxTxns; i++) {
+            if (txns[i] == null)
+                continue;
+            // Add each block to the blockList, replacing the old copy of this
+            // block if necessary, thus avoiding writing the same block twice
+            for (Iterator k = txns[i].iterator(); k.hasNext(); ) {
+                BlockIo block = (BlockIo)k.next();
+                if ( blockList.contains( block ) ) {
+                    block.decrementTransactionCount();
+                }
+                else {
+                    writtenBlocks++;
+                    boolean result = blockList.add( block );
+                }
+                numBlocks++;
+            }
+
+            txns[i] = null;
+        }
+        // Write the blocks from the blockList to disk
+        synchronizeBlocks(blockList.iterator(), true);
+
+        owner.sync();
+        open();
+    }
+
+
+    /** Opens the log file */
+    private void open() throws IOException {
+        fos = new FileOutputStream(makeLogName());
+        oos = new ObjectOutputStream(fos);
+        oos.writeShort(Magic.LOGFILE_HEADER);
+        oos.flush();
+        curTxn = -1;
+    }
+
+    /** Startup recovery on all files */
+    private void recover() throws IOException {
+        String logName = makeLogName();
+        File logFile = new File(logName);
+        if (!logFile.exists())
+            return;
+        if (logFile.length() == 0) {
+            logFile.delete();
+            return;
+        }
+
+        FileInputStream fis = new FileInputStream(logFile);
+        ObjectInputStream ois = new ObjectInputStream(fis);
+
+        try {
+            if (ois.readShort() != Magic.LOGFILE_HEADER)
+                throw new Error("Bad magic on log file");
+        } catch (IOException e) {
+            // corrupted/empty logfile
+            logFile.delete();
+            return;
+        }
+
+        while (true) {
+            ArrayList blocks = null;
+            try {
+                blocks = (ArrayList) ois.readObject();
+            } catch (ClassNotFoundException e) {
+                throw new Error("Unexcepted exception: " + e);
+            } catch (IOException e) {
+                // corrupted logfile, ignore rest of transactions
+                break;
+            }
+            synchronizeBlocks(blocks.iterator(), false);
+
+            // ObjectInputStream must match exactly each
+            // ObjectOutputStream created during writes
+            try {
+                ois = new ObjectInputStream(fis);
+            } catch (IOException e) {
+                // corrupted logfile, ignore rest of transactions
+                break;
+            }
+        }
+        owner.sync();
+        logFile.delete();
+    }
+
+    /** Synchronizes the indicated blocks with the owner. */
+    private void synchronizeBlocks(Iterator blockIterator, boolean fromCore)
+    throws IOException {
+        // write block vector elements to the data file.
+        while ( blockIterator.hasNext() ) {
+            BlockIo cur = (BlockIo)blockIterator.next();
+            owner.synch(cur);
+            if (fromCore) {
+                cur.decrementTransactionCount();
+                if (!cur.isInTransaction()) {
+                    owner.releaseFromTransaction(cur, true);
+                }
+            }
+        }
+    }
+
+
+    /** Set clean flag on the blocks. */
+    private void setClean(ArrayList blocks)
+    throws IOException {
+        for (Iterator k = blocks.iterator(); k.hasNext(); ) {
+            BlockIo cur = (BlockIo) k.next();
+            cur.setClean();
+        }
+    }
+
+    /** Discards the indicated blocks and notify the owner. */
+    private void discardBlocks(ArrayList blocks)
+    throws IOException {
+        for (Iterator k = blocks.iterator(); k.hasNext(); ) {
+            BlockIo cur = (BlockIo) k.next();
+            cur.decrementTransactionCount();
+            if (!cur.isInTransaction()) {
+                owner.releaseFromTransaction(cur, false);
+            }
+        }
+    }
+
+    /**
+     *  Starts a transaction. This can block if all slots have been filled
+     *  with full transactions, waiting for the synchronization thread to
+     *  clean out slots.
+     */
+    void start() throws IOException {
+        curTxn++;
+        if (curTxn == _maxTxns) {
+            synchronizeLogFromMemory();
+            curTxn = 0;
+        }
+        txns[curTxn] = new ArrayList();
+    }
+
+    /**
+     *  Indicates the block is part of the transaction.
+     */
+    void add(BlockIo block) throws IOException {
+        block.incrementTransactionCount();
+        txns[curTxn].add(block);
+    }
+
+    /**
+     *  Commits the transaction to the log file.
+     */
+    void commit() throws IOException {
+        oos.writeObject(txns[curTxn]);
+        sync();
+
+        // set clean flag to indicate blocks have been written to log
+        setClean(txns[curTxn]);
+
+        // open a new ObjectOutputStream in order to store
+        // newer states of BlockIo
+        oos = new ObjectOutputStream(fos);
+    }
+
+    /** Flushes and syncs */
+    private void sync() throws IOException {
+        oos.flush();
+        fos.flush();
+        fos.getFD().sync();
+    }
+
+    /**
+     *  Shutdowns the transaction manager. Resynchronizes outstanding
+     *  logs.
+     */
+    void shutdown() throws IOException {
+        synchronizeLogFromMemory();
+        close();
+    }
+
+    /**
+     *  Closes open files.
+     */
+    private void close() throws IOException {
+        sync();
+        oos.close();
+        fos.close();
+        oos = null;
+        fos = null;
+    }
+
+    /**
+     * Force closing the file without synchronizing pending transaction data.
+     * Used for testing purposes only.
+     */
+    void forceClose() throws IOException {
+        oos.close();
+        fos.close();
+        oos = null;
+        fos = null;
+    }
+
+    /**
+     * Use the disk-based transaction log to synchronize the data file.
+     * Outstanding memory logs are discarded because they are believed
+     * to be inconsistent.
+     */
+    void synchronizeLogFromDisk() throws IOException {
+        close();
+
+        for ( int i=0; i < _maxTxns; i++ ) {
+            if (txns[i] == null)
+                continue;
+            discardBlocks(txns[i]);
+            txns[i] = null;
+        }
+
+        recover();
+        open();
+    }
+
+
+    /** INNER CLASS.
+     *  Comparator class for use by the tree set used to store the blocks
+     *  to write for this transaction.  The BlockIo objects are ordered by
+     *  their blockIds.
+     */
+    public static class BlockIoComparator
+        implements Comparator
+    {
+
+        public int compare( Object o1, Object o2 ) {
+            BlockIo block1 = (BlockIo)o1;
+            BlockIo block2 = (BlockIo)o2;
+            int result = 0;
+            if ( block1.getBlockId() == block2.getBlockId() ) {
+                result = 0;
+            }
+            else if ( block1.getBlockId() < block2.getBlockId() ) {
+                result = -1;
+            }
+            else {
+                result = 1;
+            }
+            return result;
+        }
+
+        public boolean equals(Object obj) {
+            return super.equals(obj);
+        }
+    } // class BlockIOComparator
+
+}

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/TranslationPage.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/TranslationPage.java?rev=768120&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/TranslationPage.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/TranslationPage.java Fri Apr 24 01:50:59 2009
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * JDBM LICENSE v1.00
+ *
+ * Redistribution and use of this software and associated documentation
+ * ("Software"), with or without modification, are permitted provided
+ * that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain copyright
+ *    statements and notices.  Redistributions must also contain a
+ *    copy of this document.
+ *
+ * 2. Redistributions in binary form must reproduce the
+ *    above copyright notice, this list of conditions and the
+ *    following disclaimer in the documentation and/or other
+ *    materials provided with the distribution.
+ *
+ * 3. The name "JDBM" must not be used to endorse or promote
+ *    products derived from this Software without prior written
+ *    permission of Cees de Groot.  For written permission,
+ *    please contact cg@cdegroot.com.
+ *
+ * 4. Products derived from this Software may not be called "JDBM"
+ *    nor may "JDBM" appear in their names without prior written
+ *    permission of Cees de Groot. 
+ *
+ * 5. Due credit should be given to the JDBM Project
+ *    (http://jdbm.sourceforge.net/).
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE JDBM PROJECT AND CONTRIBUTORS
+ * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
+ * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
+ * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
+ * CEES DE GROOT OR ANY CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+ * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+ * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
+ * OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Copyright 2000 (C) Cees de Groot. All Rights Reserved.
+ * Contributions are Copyright (C) 2000 by their associated contributors.
+ *
+ * $Id: TranslationPage.java,v 1.1 2000/05/06 00:00:31 boisvert Exp $
+ */
+
+package org.apache.hadoop.hive.ql.util.jdbm.recman;
+
+/**
+ *  Class describing a page that holds translations from physical rowids
+ *  to logical rowids. In fact, the page just holds physical rowids - the
+ *  page's block is the block for the logical rowid, the offset serve
+ *  as offset for the rowids.
+ */
+final class TranslationPage extends PageHeader {
+    // offsets
+    static final short O_TRANS = PageHeader.SIZE; // short count
+    static final short ELEMS_PER_PAGE = 
+        (RecordFile.BLOCK_SIZE - O_TRANS) / PhysicalRowId.SIZE;
+    
+    // slots we returned.
+    final PhysicalRowId[] slots = new PhysicalRowId[ELEMS_PER_PAGE];
+
+    /**
+     *  Constructs a data page view from the indicated block.
+     */
+    TranslationPage(BlockIo block) {
+        super(block);
+    }
+
+    /**
+     *  Factory method to create or return a data page for the
+     *  indicated block.
+     */
+    static TranslationPage getTranslationPageView(BlockIo block) {
+        BlockView view = block.getView();
+        if (view != null && view instanceof TranslationPage)
+            return (TranslationPage) view;
+        else
+            return new TranslationPage(block);
+    }
+
+    /** Returns the value of the indicated rowid on the page */
+    PhysicalRowId get(short offset) {
+        int slot = (offset - O_TRANS) / PhysicalRowId.SIZE;
+        if (slots[slot] == null) 
+            slots[slot] = new PhysicalRowId(block, offset);
+        return slots[slot];
+    }
+}



Mime
View raw message