db-derby-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From oyste...@apache.org
Subject svn commit: r603260 - in /db/derby/code/trunk/java: engine/org/apache/derby/iapi/services/replication/master/ engine/org/apache/derby/impl/services/replication/master/ engine/org/apache/derby/impl/services/replication/net/ engine/org/apache/derby/loc/ ...
Date Tue, 11 Dec 2007 14:37:49 GMT
Author: oysteing
Date: Tue Dec 11 06:37:47 2007
New Revision: 603260

URL: http://svn.apache.org/viewvc?rev=603260&view=rev
Log:
Derby-3064: Patch that handles the integration of the log shipper and the master controller.
Contributed by Narayanan.

Modified:
    db/derby/code/trunk/java/engine/org/apache/derby/iapi/services/replication/master/MasterFactory.java
    db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/master/AsynchronousLogShipper.java
    db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/master/MasterController.java
    db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/net/ReplicationMessageTransmit.java
    db/derby/code/trunk/java/engine/org/apache/derby/loc/messages.xml
    db/derby/code/trunk/java/shared/org/apache/derby/shared/common/reference/MessageId.java
    db/derby/code/trunk/java/shared/org/apache/derby/shared/common/reference/SQLState.java

Modified: db/derby/code/trunk/java/engine/org/apache/derby/iapi/services/replication/master/MasterFactory.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/iapi/services/replication/master/MasterFactory.java?rev=603260&r1=603259&r2=603260&view=diff
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/iapi/services/replication/master/MasterFactory.java
(original)
+++ db/derby/code/trunk/java/engine/org/apache/derby/iapi/services/replication/master/MasterFactory.java
Tue Dec 11 06:37:47 2007
@@ -81,6 +81,10 @@
     /** Property key to specify which slave port to connect to */
     public static final String SLAVE_PORT =
         Property.PROPERTY_RUNTIME_PREFIX + "replication.master.slaveport";
+    
+    /** Property key to specify the name of the database */
+    public static final String MASTER_DB =
+        Property.PROPERTY_RUNTIME_PREFIX + "replication.master.dbname";
 
     /** Property key to specify replication mode */
     public static final String REPLICATION_MODE =

Modified: db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/master/AsynchronousLogShipper.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/master/AsynchronousLogShipper.java?rev=603260&r1=603259&r2=603260&view=diff
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/master/AsynchronousLogShipper.java
(original)
+++ db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/master/AsynchronousLogShipper.java
Tue Dec 11 06:37:47 2007
@@ -87,6 +87,12 @@
     private MasterController masterController = null;
     
     /**
+     * Store the log chunk that failed during a previous shipping attempt
+     * so that it can be re-shipped to the slave.
+     */
+    private ReplicationMessage failedChunk = null;
+    
+    /**
      * Constructor initializes the log buffer, the replication message
      * transmitter, the shipping interval and the master controller.
      *
@@ -108,6 +114,7 @@
         this.transmitter = transmitter;
         this.shippingInterval = shippingInterval;
         this.masterController = masterController;
+        this.stopShipping = false;
     }
     
     /**
@@ -126,8 +133,8 @@
                     wait(shippingInterval);
                 }
             } catch (InterruptedException ie) {
-                //Ignore the Interrupted exception to enable stopping
-                //the shipping thread in a controlled way.
+                //Interrupt the log shipping thread.
+                return;
             } catch (IOException ioe) {
                 masterController.handleExceptions(ioe);
             } catch (StandardException se) {
@@ -149,11 +156,22 @@
     private synchronized void shipALogChunk()
     throws IOException, StandardException {
         byte [] logRecords = null;
+        ReplicationMessage mesg = null;
         try {
+            //Check to see if a previous log record exists that needs
+            //to be re-transmitted. If there is then transmit that
+            //log record and then transmit the next log record in the
+            //log buffer.
+            if (failedChunk != null) {
+                transmitter.sendMessage(failedChunk);
+                failedChunk = null;
+            }
+            //transmit the log record that is at the head of
+            //the log buffer.
             if (logBuffer.next()) {
                 logRecords = logBuffer.getData();
                 
-                ReplicationMessage mesg = new ReplicationMessage(
+                mesg = new ReplicationMessage(
                     ReplicationMessage.TYPE_LOG, logRecords);
                 
                 transmitter.sendMessage(mesg);
@@ -164,6 +182,11 @@
             //buffer.
             masterController.handleExceptions(StandardException.newException
                 (SQLState.REPLICATION_UNEXPECTED_EXCEPTION, nse));
+        } catch (IOException ioe) {
+            //An exception occurred while transmitting the log record.
+            //Store the previous log record so that it can be re-transmitted
+            failedChunk = (mesg==null) ? failedChunk : mesg;
+            throw ioe;
         }
     }
     
@@ -179,7 +202,9 @@
      *                           log records from the log buffer.
      */
     public void forceFlush() throws IOException, StandardException {
-        shipALogChunk();
+        if (!stopShipping) {
+            shipALogChunk();
+        }
         
         synchronized(this) {
             //There will still be more log to send after the forceFlush

Modified: db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/master/MasterController.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/master/MasterController.java?rev=603260&r1=603259&r2=603260&view=diff
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/master/MasterController.java
(original)
+++ db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/master/MasterController.java
Tue Dec 11 06:37:47 2007
@@ -22,16 +22,22 @@
 
 package org.apache.derby.impl.services.replication.master;
 
+import java.io.IOException;
+import java.net.SocketTimeoutException;
 import org.apache.derby.iapi.error.StandardException;
+import org.apache.derby.iapi.reference.MessageId;
 import org.apache.derby.iapi.reference.SQLState;
 import org.apache.derby.iapi.services.monitor.ModuleControl;
 import org.apache.derby.iapi.services.monitor.ModuleSupportable;
+import org.apache.derby.iapi.services.monitor.Monitor;
 
 import org.apache.derby.iapi.store.raw.RawStoreFactory;
 import org.apache.derby.iapi.store.raw.log.LogFactory;
 import org.apache.derby.iapi.store.raw.data.DataFactory;
 
 import org.apache.derby.iapi.services.replication.master.MasterFactory;
+
+import org.apache.derby.impl.services.replication.ReplicationLogger;
 import org.apache.derby.impl.services.replication.net.ReplicationMessageTransmit;
 import org.apache.derby.impl.services.replication.buffer.ReplicationLogBuffer;
 import org.apache.derby.impl.services.replication.buffer.LogBufferFullException;
@@ -52,8 +58,8 @@
  *
  * @see MasterFactory
  */
-public class MasterController implements MasterFactory, ModuleControl,
-                                         ModuleSupportable {
+public class MasterController extends ReplicationLogger 
+        implements MasterFactory, ModuleControl, ModuleSupportable {
 
     private static final int DEFAULT_LOG_BUFFER_SIZE = 32768; //32K
 
@@ -61,14 +67,21 @@
     private DataFactory dataFactory;
     private LogFactory logFactory;
     private ReplicationLogBuffer logBuffer;
-    // waiting for code to go into trunk:
-    //    private LogShipper logShipper; 
+    private AsynchronousLogShipper logShipper;
     private ReplicationMessageTransmit transmitter; 
 
     private String replicationMode;
     private String slavehost;
     private int slaveport;
+    private String dbname;
+    
+    //Set to true when stopMaster is called
+    private boolean stopMasterController = false;
 
+    //How long to wait before reporting the failure to
+    //establish a connection with the slave.
+    // TODO: make this configurable through a property
+    private static final int SLAVE_CONNECTION_ATTEMPT_TIMEOUT = 5000;
 
 
     /**
@@ -107,28 +120,9 @@
             slaveport = new Integer(port).intValue();
         }
 
-        System.out.println("MasterController booted");
-    }
-
-    private boolean setupConnection(){
-        try {
-            transmitter = new ReplicationMessageTransmit(slavehost, slaveport);
-            transmitter.initConnection();
-            return true;
-        } catch (Exception e) {
-            // printline used for debugging - will be removed
-            System.out.println("(MC) Got an exception during setupConnection:");
-            return false;
-        }
+        dbname = properties.getProperty(MasterFactory.MASTER_DB);
     }
 
-    /**
-     * Will stop the replication master service
-     *
-     * Not implemented yet
-     */
-    public void stop() { }
-
     ////////////////////////////////////////////////////////////////
     // Implementation of methods from interface ModuleSupportable //
     ////////////////////////////////////////////////////////////////
@@ -158,15 +152,20 @@
             return false;
         }
     }
+    
+    /**
+     * Will stop the replication master service
+     *
+     * Not implemented yet
+     */
+    public void stop() { }
 
     ////////////////////////////////////////////////////////////
     // Implementation of methods from interface MasterFactory //
     ////////////////////////////////////////////////////////////
 
     /**
-     * Will perform all the work that is needed to set up replication
-     *
-     * Not implemented yet
+     * Will perform all the work that is needed to set up replication.
      *
      * @param rawStore The RawStoreFactory for the database
      * @param dataFac The DataFactory for this database
@@ -175,25 +174,24 @@
      * thrown on replication startup error. 
      */
     public void startMaster(RawStoreFactory rawStore,
-                            DataFactory dataFac, LogFactory logFac)
-        throws StandardException{
-        // Added when Network Service has been committed to trunk:
-        // connection.connect(); // sets up a network connection to the slave
-
+                            DataFactory dataFac, LogFactory logFac) 
+                            throws StandardException {
+        stopMasterController = false;
         rawStoreFactory = rawStore;
         dataFactory = dataFac;
         logFactory = logFac;
         logBuffer = new ReplicationLogBuffer(DEFAULT_LOG_BUFFER_SIZE);
 
-        // May want to move this below connectblock later when
-        // database is not filesystem copied to slave. 
         logFactory.startReplicationMasterRole(this);
 
+        setupConnection();
+
         if (replicationMode.equals(MasterFactory.ASYNCHRONOUS_MODE)) {
-            System.out.println("MasterController would now " +
-                               "start asynchronous log shipping");
-            // Added when Master Log Shipping code has been committed to trunk:
-            // logShipper = new AsynchronousLogShipper(connection);
+            logShipper = new AsynchronousLogShipper(logBuffer,
+                                                    transmitter,
+                                                    1000,
+                                                    this);
+            ((Thread)logShipper).start();
         }
 
         // Add code that initializes replication by sending the
@@ -201,24 +199,19 @@
         // the buffer etc. Repliation should be up and running when
         // this method returns.
 
-        System.out.println("MasterController started");
+        Monitor.logTextMessage(MessageId.REPLICATION_MASTER_STARTED, dbname);
     }
 
     /**
      * Will perform all work that is needed to shut down replication
-     *
-     * Not implemented yet
      */
     public void stopMaster() {
-        // logFactory.stopReplicationLogging(); // added later
-
-        // Added when Network Service has been committed to trunk:
-        // if (connection.isUp()) {
-        //     logShipper.flushAllLog();
-        // }
-
-        // logBuffer.stop();
-        System.out.println("MasterController stopped");
+        stopMasterController = true;
+        //interrupt the periodic shipper first
+        logShipper.interrupt();
+        //This would ensure that any further call to forceFlush fails.
+        logShipper.stopLogShipment();
+        Monitor.logTextMessage(MessageId.REPLICATION_MASTER_STOPPED, dbname);
     }
 
     /**
@@ -237,13 +230,13 @@
         try {
             logBuffer.appendLog(greatestInstant, log, logOffset, logLength);
         } catch (LogBufferFullException lbfe) {
-            // Waiting for log shipper to implement this
-            // We have multiple alternatives: 
-            //  1) Try to force-send some log to the slave:
-            //     logShipper.forceFlush()
-            //  2) Increase the size of the buffer
-            // Stop replication if both these are unsuccessful or not
-            // an alternative. 
+            try {
+                logShipper.forceFlush();
+            } catch (IOException ioe) {
+                printStackAndStopMaster(ioe);
+            } catch (StandardException se) {
+                printStackAndStopMaster(se);
+            }
         }
     }
 
@@ -272,16 +265,72 @@
      * @see LogFactory#flush
      */
     public void flushedTo(long instant) {
-        // logShipper.flushedTo(instant); 
+        logShipper.flushedInstance(instant); 
     }
     
     /**
-     * Used by the log shipper to inform the master controller about the 
-     * exception condition that caused it to terminate unexpectedly.
+     * Connects to the slave being replicated to.
+     *
+     * @throws StandardException If a failure occurs while trying to open
+     *                           the connection to the slave.
+     */
+    private void setupConnection() throws StandardException {
+        try {
+            transmitter = new ReplicationMessageTransmit(slavehost, slaveport);
+            transmitter.initConnection(SLAVE_CONNECTION_ATTEMPT_TIMEOUT);
+        } catch (SocketTimeoutException ste) {
+            throw StandardException.newException
+                    (SQLState.REPLICATION_MASTER_TIMED_OUT, dbname);
+        } catch (IOException ioe) {
+            throw StandardException.newException
+                    (SQLState.REPLICATION_CONNECTION_EXCEPTION, ioe, dbname);
+        } catch (Exception e) {
+            throw StandardException.newException
+                    (SQLState.REPLICATION_CONNECTION_EXCEPTION, e, dbname);
+        }
+    }
+    
+    /**
+     * Used to handle the exceptions (IOException and StandardException) from 
+     * the log shipper.
      *
      * @param exception the exception which caused the log shipper to terminate
      *                  in an unexcepted manner.
      */
     void handleExceptions(Exception exception) {
+        if (exception instanceof IOException) {
+            logError(MessageId.REPLICATION_LOGSHIPPER_EXCEPTION, 
+                    exception, dbname);
+            Monitor.logTextMessage(MessageId.REPLICATION_MASTER_RECONN, dbname);
+            
+            while (!stopMasterController) {
+                try {
+                    transmitter = new ReplicationMessageTransmit
+                            (slavehost, slaveport);
+                    transmitter.initConnection
+                            (SLAVE_CONNECTION_ATTEMPT_TIMEOUT);
+                    break;
+                } catch (SocketTimeoutException ste) {
+                    continue;
+                } catch (IOException ioe) {
+                    continue;
+                } catch (Exception e) {
+                    printStackAndStopMaster(e);
+                }
+            }
+        } else if (exception instanceof StandardException) {
+            printStackAndStopMaster(exception);
+        }
+    }
+    
+    /**
+     * used to print the error stack for the given exception and
+     * stop the master.
+     *
+     * @param t the throwable that needs to be handled.
+     */
+    private void printStackAndStopMaster(Throwable t) {
+        logError(MessageId.REPLICATION_LOGSHIPPER_EXCEPTION, t, dbname);
+        stopMaster();
     }
 }

Modified: db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/net/ReplicationMessageTransmit.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/net/ReplicationMessageTransmit.java?rev=603260&r1=603259&r2=603260&view=diff
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/net/ReplicationMessageTransmit.java
(original)
+++ db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/net/ReplicationMessageTransmit.java
Tue Dec 11 06:37:47 2007
@@ -21,6 +21,7 @@
 package org.apache.derby.impl.services.replication.net;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.UnknownHostException;
 import java.security.AccessController;
@@ -72,6 +73,9 @@
      * comparing the UID's of the <code>ReplicationMessage</code> classes
      * of the master and the slave.
      *
+     * @param timeout the amount of time for which the connection should
+     *                block before being established.
+     *
      * @throws PrivilegedActionException if an exception occurs while trying
      *                                   to open a connection.
      *
@@ -85,7 +89,7 @@
      * @throws ClassNotFoundException Class of a serialized object cannot
      *         be found.
      */
-    public void initConnection() throws
+    public void initConnection(int timeout) throws
         PrivilegedActionException,
         IOException,
         StandardException,
@@ -93,13 +97,19 @@
         
         Socket s = null;
         
+        final int timeout_ = timeout;
+        
         //create a connection to the slave.
         s = (Socket)
         AccessController.doPrivileged(new PrivilegedExceptionAction() {
             public Object run() throws IOException {
                 SocketFactory sf = SocketFactory.getDefault();
-                return sf.createSocket(slaveAddress.getHostAddress(),
-                    slaveAddress.getPortNumber());
+                InetSocketAddress sockAddr = new InetSocketAddress(
+                        slaveAddress.getHostAddress(), 
+                        slaveAddress.getPortNumber());
+                Socket s_temp = sf.createSocket();
+                s_temp.connect(sockAddr, timeout_);
+                return s_temp;
             }
         });
         

Modified: db/derby/code/trunk/java/engine/org/apache/derby/loc/messages.xml
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/loc/messages.xml?rev=603260&r1=603259&r2=603260&view=diff
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/loc/messages.xml (original)
+++ db/derby/code/trunk/java/engine/org/apache/derby/loc/messages.xml Tue Dec 11 06:37:47
2007
@@ -4751,6 +4751,12 @@
                 <arg>masterinstant</arg>
                 <arg>slaveinstant</arg>
             </msg>
+            
+            <msg>
+                <name>XRE06</name>
+                <text>The connection attempts to the replication slave for the database
{0} exceeded the specified timeout period.</text>
+                <arg>dbname</arg>
+            </msg>
 
 
         </family>
@@ -7568,6 +7574,29 @@
             <msg>
                 <name>R006</name>
                 <text>Lost connection with the replication master of database '{0}'.</text>
+                <arg>dbname</arg>
+            </msg>
+            
+            <msg>
+                <name>R007</name>
+                <text>Replication master role started for database '{0}'.</text>
+                <arg>dbname</arg>
+            </msg>
+            
+            <msg>
+                <name>R008</name>
+                <text>Replication master role stopped for database '{0}'.</text>
+                <arg>dbname</arg>
+            </msg>
+            
+            <msg>
+                <name>R009</name>
+                <text>Exception occurred during log shipping.</text>
+            </msg>
+            
+            <msg>
+                <name>R010</name>
+                <text>Replication master trying to reconnect to slave for database
'{0}'.</text>
                 <arg>dbname</arg>
             </msg>
         </family>

Modified: db/derby/code/trunk/java/shared/org/apache/derby/shared/common/reference/MessageId.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/shared/org/apache/derby/shared/common/reference/MessageId.java?rev=603260&r1=603259&r2=603260&view=diff
==============================================================================
--- db/derby/code/trunk/java/shared/org/apache/derby/shared/common/reference/MessageId.java
(original)
+++ db/derby/code/trunk/java/shared/org/apache/derby/shared/common/reference/MessageId.java
Tue Dec 11 06:37:47 2007
@@ -179,5 +179,9 @@
     String REPLICATION_SLAVE_STOPPED                     = "R004";
     String REPLICATION_FATAL_ERROR                       = "R005";
     String REPLICATION_SLAVE_LOST_CONN                   = "R006";
+    String REPLICATION_MASTER_STARTED                    = "R007";
+    String REPLICATION_MASTER_STOPPED                    = "R008";
+    String REPLICATION_LOGSHIPPER_EXCEPTION              = "R009";
+    String REPLICATION_MASTER_RECONN                     = "R010";
 
 }

Modified: db/derby/code/trunk/java/shared/org/apache/derby/shared/common/reference/SQLState.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/shared/org/apache/derby/shared/common/reference/SQLState.java?rev=603260&r1=603259&r2=603260&view=diff
==============================================================================
--- db/derby/code/trunk/java/shared/org/apache/derby/shared/common/reference/SQLState.java
(original)
+++ db/derby/code/trunk/java/shared/org/apache/derby/shared/common/reference/SQLState.java
Tue Dec 11 06:37:47 2007
@@ -1763,5 +1763,6 @@
     String REPLICATION_UNEXPECTED_EXCEPTION                        = "XRE03";
     String REPLICATION_CONNECTION_EXCEPTION                        = "XRE04";
     String REPLICATION_LOG_OUT_OF_SYNCH                            = "XRE05";
+    String REPLICATION_MASTER_TIMED_OUT                            = "XRE06";
 }
 



Mime
View raw message