db-derby-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kahat...@apache.org
Subject svn commit: r568121 - in /db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication: ./ buffer/ buffer/LogBufferElement.java buffer/LogBufferFullException.java buffer/ReplicationLogBuffer.java
Date Tue, 21 Aug 2007 13:34:42 GMT
Author: kahatlen
Date: Tue Aug 21 06:34:40 2007
New Revision: 568121

URL: http://svn.apache.org/viewvc?rev=568121&view=rev
Log:
DERBY-2926: Replication: Add a log buffer for log records that should
be shipped to the slave

Contributed by Jørgen Løland.

Added:
    db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/
    db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/buffer/
    db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/buffer/LogBufferElement.java
  (with props)
    db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/buffer/LogBufferFullException.java
  (with props)
    db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/buffer/ReplicationLogBuffer.java
  (with props)

Added: db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/buffer/LogBufferElement.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/buffer/LogBufferElement.java?rev=568121&view=auto
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/buffer/LogBufferElement.java
(added)
+++ db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/buffer/LogBufferElement.java
Tue Aug 21 06:34:40 2007
@@ -0,0 +1,193 @@
+/*
+
+   Derby - Class org.apache.derby.impl.services.replication.buffer.LogBufferElement
+
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to you under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+
+ */
+
+package org.apache.derby.impl.services.replication.buffer;
+
+import org.apache.derby.iapi.services.sanity.SanityManager;
+
+/**
+ * ReplicationLogBuffer consists of n LogBufferElements, each of which
+ * can store m log records in a single byte[].
+ *
+ * In addition to adding the log record information to the byte[], the
+ * greatestInstant variable is updated for every append so that
+ * getLastInstant can be used to get the highest log instant in this
+ * LogBufferElement.
+ */
+
+class LogBufferElement {
+
+    private final byte[] bufferdata;
+    private int position;
+    private long greatestInstant;
+    // put back in freeBuffers when content has been sent to slave?
+    private boolean recycleMe;
+
+    protected LogBufferElement(int bufferSize){
+        bufferdata = new byte[bufferSize];
+        init();
+    }
+
+    /**
+     * Resets all variables to default values. Should be called before
+     * a LogBufferElement is reused.
+     */
+    protected void init() {
+        this.position = 0;
+        greatestInstant = 0;
+        recycleMe = true; //always recycle unless explicitly told otherwise
+    }
+
+    /**
+     * Append a single log record to this LogBufferElement.
+     *
+     * @param instant               the log address of this log record.
+     * @param dataLength            number of bytes in data[]
+     * @param dataOffset            offset in data[] to start copying from.
+     * @param optionalDataLength    number of bytes in optionalData[]
+     * @param optionalDataOffset    offset in optionalData[] to start copy from
+     * @param data                  "from" array to copy "data" portion of rec
+     * @param optionalData          "from" array to copy "optional data" from
+     **/
+    protected void appendLogRecord(long instant,
+                                int dataLength,
+                                int dataOffset,
+                                int optionalDataLength,
+                                int optionalDataOffset,
+                                byte[] data,
+                                byte[] optionalData){
+
+        if (SanityManager.DEBUG){
+            int totalSize = dataLength + optionalDataLength +
+                ReplicationLogBuffer.LOG_RECORD_FIXED_OVERHEAD_SIZE;
+            SanityManager.ASSERT(freeSize() >= totalSize,
+                                 "Log record does not fit into"+
+                                 " this LogBufferElement");
+        }
+
+        position = appendLong(instant, position);
+        position = appendInt(dataLength, position);
+        position = appendInt(dataOffset, position);
+        position = appendInt(optionalDataLength, position);
+        position = appendInt(optionalDataOffset, position);
+
+        if (dataLength > 0){
+            position = appendBytes(data, position, dataLength);
+        }
+
+        if (optionalDataLength > 0) {
+            position = appendBytes(optionalData, position, optionalDataLength);
+        }
+
+        this.greatestInstant = instant;
+    }
+
+    /**
+     * @return A byte[] representation of the log records appended to
+     * this LogBufferElement
+     */
+    protected byte[] getData(){
+        return bufferdata;
+    }
+
+    /**
+     * @return The highest log instant appended to this LogBufferElement
+     */
+    protected long getLastInstant(){
+        return greatestInstant;
+    }
+
+    /**
+     * @return Number of unused bytes in this LogBufferElement
+     */
+    protected int freeSize(){
+        return bufferdata.length - position;
+    }
+
+    /**
+     * @return Number of used bytes in this LogBufferElement
+     */
+    protected int size(){
+        return position;
+    }
+
+    /**
+     * @return true if this LogBufferElement should be reused, i.e.
+     * added to freeBuffers after being consumed.
+     */
+    protected boolean isRecyclable(){
+        return recycleMe;
+    }
+
+    protected void setRecyclable(boolean r){
+        recycleMe = r;
+    }
+
+    /*
+     * The append methods should be changed to use java.nio.ByteBuffer
+     * if it is decided that replication will never use j2me. We use
+     * our own implementation for now so that j2me is not blocked.
+     */
+
+    /**
+     * Append a byte[] to this LogBufferElement.
+     * @return new position
+     */
+    private int appendBytes(byte b[], int pos, int length) {
+        if (SanityManager.DEBUG){
+            SanityManager.ASSERT(freeSize() >= (pos+length),
+                                 "byte[] is to big to fit"+
+                                 " into this buffer");
+            SanityManager.ASSERT(b != null, "Cannot append null to buffer");
+        }
+        System.arraycopy(b, 0, bufferdata, pos, length);
+        return pos + length;
+    }
+
+    /**
+     * Append an int to this LogBufferElement.
+     * @return new position
+     */
+    private int appendInt(int i, int p) {
+        bufferdata[p++] = (byte) (i >> 24);
+        bufferdata[p++] = (byte) (i >> 16);
+        bufferdata[p++] = (byte) (i >> 8);
+        bufferdata[p++] = (byte) i;
+        return p;
+    }
+
+    /**
+     * Append a long to this LogBufferElement.
+     * @return new position
+     */
+    private int appendLong(long l, int p) {
+        bufferdata[p++] = (byte) (l >> 56);
+        bufferdata[p++] = (byte) (l >> 48);
+        bufferdata[p++] = (byte) (l >> 40);
+        bufferdata[p++] = (byte) (l >> 32);
+        bufferdata[p++] = (byte) (l >> 24);
+        bufferdata[p++] = (byte) (l >> 16);
+        bufferdata[p++] = (byte) (l >> 8);
+        bufferdata[p++] = (byte) l;
+        return p;
+    }
+
+}

Propchange: db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/buffer/LogBufferElement.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/buffer/LogBufferFullException.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/buffer/LogBufferFullException.java?rev=568121&view=auto
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/buffer/LogBufferFullException.java
(added)
+++ db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/buffer/LogBufferFullException.java
Tue Aug 21 06:34:40 2007
@@ -0,0 +1,28 @@
+/*
+
+   Derby - Class org.apache.derby.impl.services.replication.buffer.LogBufferFullException
+
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to you under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+
+ */
+
+package org.apache.derby.impl.services.replication.buffer;
+
+public class LogBufferFullException extends Exception{
+
+    public LogBufferFullException(){}
+
+}

Propchange: db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/buffer/LogBufferFullException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/buffer/ReplicationLogBuffer.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/buffer/ReplicationLogBuffer.java?rev=568121&view=auto
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/buffer/ReplicationLogBuffer.java
(added)
+++ db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/buffer/ReplicationLogBuffer.java
Tue Aug 21 06:34:40 2007
@@ -0,0 +1,363 @@
+/*
+
+   Derby - Class org.apache.derby.impl.services.replication.buffer.ReplicationLogBuffer
+
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to you under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+
+*/
+
+package org.apache.derby.impl.services.replication.buffer;
+
+import org.apache.derby.iapi.services.sanity.SanityManager;
+
+import java.util.Arrays;
+import java.util.NoSuchElementException;
+import java.util.LinkedList;
+
+/**
+ * Used for the replication master role only. When a Derby instance
+ * has the replication master role for a database 'x', all log records
+ * that are written to the local log file are also appended to this
+ * log buffer. The replication master service will consume chunks of
+ * log from this buffer and send it to the Derby instance with the
+ * slave role for 'x'.
+ *
+ * ReplicationLogBuffer consists of a number of LogBufferElements.
+ * Elements that are not in use are in the freeBuffers list, while
+ * elements that contains dirty log are in dirtyBuffers. Log records
+ * are appended to the buffer element in currentDirtyBuffer. Hence,
+ * the life cycle of buffer elements is:
+ * freeBuffers -> currentDirtyBuffer -> dirtyBuffers -> freeBuffers
+ *
+ * To append log records to the buffer, use appendLogRecord(...)
+ *
+ * To consume chunks of log records, use next() followed by getData(),
+ * getLastInstant() and getSize(). These get-methods throw
+ * NoSuchElementException if next() returned false, meaning that there
+ * were no dirty log at the time next() was called.
+ *
+ * Threads: ReplicationLogBuffer is threadsafe. It can be used by a
+ * logger (LogToFile) and a log consumer (LogShipping service)
+ * concurrently without further synchronization.
+ */
+
+public class ReplicationLogBuffer {
+
+    private static final int DEFAULT_NUMBER_LOG_BUFFERS = 10;
+
+    protected static final int LOG_RECORD_FIXED_OVERHEAD_SIZE = 24;
+    // long instant           - 8
+    // int dataLength         - 4
+    // int dataOffset         - 4
+    // int optionalDataLength - 4
+    // int optionalDataOffset - 4
+
+    private final LinkedList dirtyBuffers;// LogBufferElements with unsent log
+    private final LinkedList freeBuffers; // currently unused LogBufferElements
+
+    // the buffer we currently APPEND log records to
+    private LogBufferElement currentDirtyBuffer;
+
+    // used to GET data from this buffer. next() sets these
+    private boolean validOutBuffer; // outBuffer contains valid data
+    private byte[] outBufferData;   // the buffer contents
+    private int outBufferStored;    // number of bytes currently stored
+    private long outBufferLastInstant;// highest instant (LSN) in outBufferData
+
+    // Two objects to synchronize on so that the logger (LogToFile)
+    // and the log consumer (LogShipping service) can use the buffer
+    // concurrently (although appendLogRecord may conflict with next).
+    // In cases where both latches are needed at the same time,
+    // listLatch is always set first to avoid deadlock. listLatch is
+    // used by appendLogRecord and next to synchronize operations on
+    // the free and dirty buffer lists and on currentDirtyBuffer.
+    // outputLatch is used by next and getXXX to synchronize on the
+    // output data variables
+    private final Object listLatch = new Object();
+    private final Object outputLatch = new Object();
+
+    private int defaultBufferSize;
+
+    public ReplicationLogBuffer(int bufferSize) {
+        defaultBufferSize = bufferSize;
+
+        outBufferData = new byte[bufferSize];
+        outBufferStored = 0;
+        outBufferLastInstant = 0;
+        validOutBuffer = false; // no valid data in outBuffer yet
+
+        dirtyBuffers = new LinkedList();
+        freeBuffers = new LinkedList();
+
+        for (int i = 0; i < DEFAULT_NUMBER_LOG_BUFFERS; i++){
+            LogBufferElement b = new LogBufferElement(bufferSize);
+            freeBuffers.addLast(b);
+        }
+        currentDirtyBuffer = (LogBufferElement)freeBuffers.removeFirst();
+    }
+
+    /**
+     * Append a single log record to the log buffer.
+     *
+     * @param instant               the log address of this log record.
+     * @param dataLength            number of bytes in data[]
+     * @param dataOffset            offset in data[] to start copying from.
+     * @param optionalDataLength    number of bytes in optionalData[]
+     * @param optionalDataOffset    offset in optionalData[] to start copy from
+     * @param data                  "from" array to copy "data" portion of rec
+     * @param optionalData          "from" array to copy "optional data" from
+     *
+     * @throws LogBufferFullException - thrown if there is not enough
+     * free space in the buffer to store the log record.
+     **/
+    public void appendLogRecord(long instant,
+                                int dataLength,
+                                int dataOffset,
+                                int optionalDataLength,
+                                int optionalDataOffset,
+                                byte[] data,
+                                byte[] optionalData)
+        throws LogBufferFullException{
+
+        /* format of log to write:
+         *
+         * (long)   instant
+         * (int)    dataLength
+         * (int)    dataOffset
+         * (int)    optionalDataLength
+         * (int)    optionalDataOffset
+         * (byte[]) data
+         * (byte[]) optionalData
+         */
+
+        int totalLength = dataLength + optionalDataLength +
+                          LOG_RECORD_FIXED_OVERHEAD_SIZE;
+
+        synchronized (listLatch) {
+            if (currentDirtyBuffer == null) {
+                switchDirtyBuffer();
+                // either sets the currentDirtyBuffer to a buffer
+                // element or throws a LogBufferFullException
+            }
+
+            // switch buffer if current buffer does not have enough space
+            // for the incoming data
+            if (totalLength > currentDirtyBuffer.freeSize()) {
+                switchDirtyBuffer();
+            }
+
+            if (totalLength <= currentDirtyBuffer.freeSize()) {
+                currentDirtyBuffer.appendLogRecord(instant,
+                                                   dataLength,
+                                                   dataOffset,
+                                                   optionalDataLength,
+                                                   optionalDataOffset,
+                                                   data,
+                                                   optionalData);
+            } else {
+                // The log record requires more space than one
+                // LogBufferElement with default size. Create a new big
+                // enough LogBufferElement
+                LogBufferElement current = new LogBufferElement(totalLength);
+                current.setRecyclable(false);
+                current.appendLogRecord(instant,
+                                        dataLength,
+                                        dataOffset,
+                                        optionalDataLength,
+                                        optionalDataOffset,
+                                        data,
+                                        optionalData);
+                dirtyBuffers.addLast(current);
+                // currentDirtyBuffer has already been handed over to
+                // the dirtyBuffers list, and an empty one is in
+                // place, so no need to touch currentDirtyBuffer here
+            }
+        }
+    }
+
+    /**
+     * Sets the output data to that of the next (oldest) buffer
+     * element in dirtyBuffers so that getData(), getLastInstant() and
+     * getSize() return values from the next oldest chunk of log. Used
+     * by the log consumer (the LogShipping service) to move to the
+     * next chunk of log in the buffer.
+     *
+     * @return true if there is log in the buffer, resulting in valid
+     * data for the get-methods
+     */
+    public boolean next() {
+        synchronized (listLatch) {
+
+            if (dirtyBuffers.size() == 0) {
+                // if the current buffer has been written to, and
+                // there are no other dirty buffers, it should be
+                // moved to the dirtyBuffer list so that it can be
+                // returned.
+                try {
+                    switchDirtyBuffer();
+                } catch (LogBufferFullException lbfe) {
+                    // should not be possible when dirtyBuffers.size() == 0
+                    if (SanityManager.DEBUG){
+                        SanityManager.THROWASSERT(
+                            "Unexpected LogBufferFullException when trying "+
+                            "to remove elements from the buffer", lbfe);
+                    }
+                }
+            }
+
+            synchronized (outputLatch) {
+                if (dirtyBuffers.size() > 0 ) {
+                    LogBufferElement current =
+                        (LogBufferElement)dirtyBuffers.removeFirst();
+
+                    if (outBufferData.length < current.size()) {
+                        // resize outBufferData if it has too few bytes
+                        outBufferData = new byte[current.size()];
+                    } else if(outBufferData.length != defaultBufferSize) {
+                        // the buffer should be resized to default if it has
+                        // previously been increased
+                        int newSize = defaultBufferSize;
+                        if (current.size() > newSize) {
+                            // can not be smaller than current LogBufferElement
+                            newSize = current.size();
+                        }
+                        outBufferData = new byte[newSize];
+                    }
+
+                    // set the outBuffer data
+                    System.arraycopy(current.getData(), 0, outBufferData, 0,
+                                     current.size());
+                    outBufferStored = current.size();
+                    outBufferLastInstant = current.getLastInstant();
+
+                    // recycle = false if the LogBufferElement has been
+                    // used to store a single very big log record
+                    if (current.isRecyclable()) {
+                        freeBuffers.addLast(current);
+                    }
+
+                    validOutBuffer = true;
+                } else {
+                    // No more dirty data to get
+                    validOutBuffer = false;
+                }
+            }
+        }
+
+        return validOutBuffer;
+    }
+
+    /**
+     * Returns a byte[] containing a chunk of serialized log records.
+     * Always returns the log that was oldest at the time next() was
+     * called last time. Use next() to move to the next chunk of log
+     * records.
+     *
+     * @return A copy of the current byte[], which is a chunk of log
+     * @throws NoSuchElementException if there was no log in the
+     * buffer the last time next() was called.
+     */
+    public byte[] getData() throws NoSuchElementException{
+        synchronized (outputLatch) {
+            byte [] b = new byte[getSize()];
+            if (validOutBuffer) {
+                System.arraycopy(outBufferData, 0, b, 0, getSize());
+                return b;
+            } else
+                throw new NoSuchElementException();
+        }
+    }
+
+    /**
+     * Method to determine whether or not the buffer had log record
+     * the last time next() was called.
+     *
+     * @return true if the buffer contained log records the last time
+     * next() was called. False if not, or if next() has not been
+     * called yet.
+     */
+    public boolean validData() {
+        synchronized (outputLatch) {
+            return validOutBuffer;
+        }
+    }
+
+    /**
+     * @return The number of bytes returned by getData
+     * @throws NoSuchElementException if there was no log in the
+     * buffer the last time next() was called.
+     */
+    public int getSize() throws NoSuchElementException{
+        synchronized (outputLatch) {
+            if (validOutBuffer) {
+                return outBufferStored;
+            } else {
+                throw new NoSuchElementException();
+            }
+        }
+    }
+
+    /**
+     * Can be used so that only the necessary log records are sent
+     * when a flush(LogInstant flush_to_this) is called in the log
+     * factory.
+     *
+     * @return The highest log instant in the chunk of log returned by
+     * getData().
+     * @throws NoSuchElementException if there was no log in the
+     * buffer the last time next() was called.
+     */
+    public long getLastInstant() throws NoSuchElementException{
+        synchronized (outputLatch) {
+            if (validOutBuffer) {
+                return outBufferLastInstant;
+            } else {
+                throw new NoSuchElementException();
+            }
+        }
+    }
+
+    /**
+     * Appends the currentDirtyBuffer to dirtyBuffers, and makes a
+     * fresh buffer element from freeBuffers the currentDirtyBuffer.
+     * Note: this method is not synchronized since all uses of it is
+     * inside synchronized(listLatch) code blocks.
+     *
+     * @throws LogBufferFullException if the freeBuffers list is empty
+     */
+    private void switchDirtyBuffer() throws LogBufferFullException{
+        // first, move currentDirtyBuffer to dirtyBuffers list.
+        // do not switch if current buffer is empty
+        if (currentDirtyBuffer != null && currentDirtyBuffer.size() > 0) {
+            dirtyBuffers.addLast(currentDirtyBuffer);
+            currentDirtyBuffer = null;
+        }
+
+        // second, make a buffer element from the freeBuffers list the
+        // new currentDirtyBuffer. If currentDirtyBuffer != null, it
+        // is empty and has therefore not been moved to dirtyBuffers
+        if (currentDirtyBuffer == null) {
+            try {
+                currentDirtyBuffer =
+                    (LogBufferElement)freeBuffers.removeFirst();
+                currentDirtyBuffer.init();
+            } catch (NoSuchElementException nsee) {
+                throw new LogBufferFullException();
+            }
+        }
+    }
+
+}

Propchange: db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/buffer/ReplicationLogBuffer.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message