Author: oysteing Date: Fri Nov 16 18:54:30 2007 New Revision: 595900 URL: http://svn.apache.org/viewvc?rev=595900&view=rev Log: (empty) Added: db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/ReplicationLogger.java Modified: db/derby/code/trunk/java/engine/org/apache/derby/iapi/services/replication/slave/SlaveFactory.java db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/net/ReplicationMessageReceive.java db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/slave/SlaveController.java db/derby/code/trunk/java/engine/org/apache/derby/impl/store/raw/log/LogToFile.java db/derby/code/trunk/java/engine/org/apache/derby/loc/messages.xml db/derby/code/trunk/java/shared/org/apache/derby/shared/common/reference/MessageId.java db/derby/code/trunk/java/shared/org/apache/derby/shared/common/reference/SQLState.java Modified: db/derby/code/trunk/java/engine/org/apache/derby/iapi/services/replication/slave/SlaveFactory.java URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/iapi/services/replication/slave/SlaveFactory.java?rev=595900&r1=595899&r2=595900&view=diff ============================================================================== --- db/derby/code/trunk/java/engine/org/apache/derby/iapi/services/replication/slave/SlaveFactory.java (original) +++ db/derby/code/trunk/java/engine/org/apache/derby/iapi/services/replication/slave/SlaveFactory.java Fri Nov 16 18:54:30 2007 @@ -27,7 +27,6 @@ import org.apache.derby.iapi.store.raw.RawStoreFactory; import org.apache.derby.iapi.store.raw.log.LogFactory; -import org.apache.derby.iapi.store.raw.data.DataFactory; /** *
@@ -50,10 +49,18 @@
/* Strings used as keys in the Properties objects*/
+ /** Property key to specify which host to listen to */
+ public static final String SLAVE_HOST =
+ Property.PROPERTY_RUNTIME_PREFIX + "replication.slave.slavehost";
+
/** Property key to specify which port to listen to */
public static final String SLAVE_PORT =
Property.PROPERTY_RUNTIME_PREFIX + "replication.slave.slaveport";
+ /** Property key to specify the name of the database */
+ public static final String SLAVE_DB =
+ Property.PROPERTY_RUNTIME_PREFIX + "replication.slave.dbname";
+
/** Property key to specify replication mode */
public static final String REPLICATION_MODE =
Property.PROPERTY_RUNTIME_PREFIX + "replication.slave.mode";
@@ -78,8 +85,12 @@
*
* @param rawStore The RawStoreFactory for the database
* @param logFac The LogFactory ensuring recoverability for this database
+ *
+ * @exception StandardException Thrown if the slave could not be
+ * started.
*/
- public void startSlave(RawStoreFactory rawStore, LogFactory logFac);
+ public void startSlave(RawStoreFactory rawStore, LogFactory logFac)
+ throws StandardException;
/**
* Will perform all work that is needed to stop replication
Added: db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/ReplicationLogger.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/ReplicationLogger.java?rev=595900&view=auto
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/ReplicationLogger.java (added)
+++ db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/ReplicationLogger.java Fri Nov 16 18:54:30 2007
@@ -0,0 +1,71 @@
+/*
+
+ Derby - Class
+ org.apache.derby.impl.services.replication.ReplicationLogger
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to you under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+ */
+
+package org.apache.derby.impl.services.replication;
+
+import org.apache.derby.iapi.reference.MessageId;
+import org.apache.derby.iapi.services.context.ErrorStringBuilder;
+import org.apache.derby.iapi.services.monitor.Monitor;
+
+public abstract class ReplicationLogger {
+
+ /** Whether or not to print log messages to derby.log. Defaults to
+ * true, but can be set to false with derby property
+ * "derby.replication.logerrormessages=true"
+ */
+ // TODO: make this configurable through the aforementioned
+ // property
+ private static final boolean LOG_REPLICATION_MESSAGES = true;
+
+
+ /**
+ * Print error message and the stack trace of the throwable to the
+ * log (usually derby.log) provided that LOG_REPLICATION_MESSAGES
+ * is true. If LOG_REPLICATION_MESSAGES is false, nothing is
+ * logged.
+ *
+ * @param msgId The error message id
+ * @param t Error trace starts from this error
+ * @param dbname The name of the replicated database
+ */
+ protected void logError(String msgId, Throwable t, String dbname) {
+
+ if (LOG_REPLICATION_MESSAGES) {
+
+ Monitor.logTextMessage(MessageId.REPLICATION_ERROR_BEGIN);
+
+ if (msgId != null) {
+ Monitor.logTextMessage(msgId, dbname);
+ }
+
+ if (t != null) {
+ ErrorStringBuilder esb =
+ new ErrorStringBuilder(Monitor.getStream().getHeader());
+ esb.stackTrace(t);
+ Monitor.logMessage(esb.get().toString());
+ esb.reset();
+ }
+ Monitor.logTextMessage(MessageId.REPLICATION_ERROR_END);
+ }
+ }
+
+}
Modified: db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/net/ReplicationMessageReceive.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/net/ReplicationMessageReceive.java?rev=595900&r1=595899&r2=595900&view=diff
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/net/ReplicationMessageReceive.java (original)
+++ db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/net/ReplicationMessageReceive.java Fri Nov 16 18:54:30 2007
@@ -60,12 +60,17 @@
* @param portNumber an integer that contains the port number of the
* slave to replicate to.
*
- * @throws UnknownHostException If an exception occurs while trying to
- * resolve the host name.
+ * @throws StandardException If an exception occurs while trying to
+ * resolve the host name.
*/
public ReplicationMessageReceive(String hostName, int portNumber)
- throws UnknownHostException {
- slaveAddress = new SlaveAddress(hostName, portNumber);
+ throws StandardException {
+ try {
+ slaveAddress = new SlaveAddress(hostName, portNumber);
+ } catch (UnknownHostException uhe) {
+ throw StandardException.newException
+ (SQLState.REPLICATION_CONNECTION_EXCEPTION, uhe);
+ }
}
/**
@@ -73,6 +78,12 @@
* for connections from the master and verify compatibility
* with the database version of the master.
*
+ * @param timeout The amount of time, in milliseconds, this method
+ * will wait for a connection to be established. If no connection
+ * has been established before the timeout, a
+ * PrivilegedExceptionAction is raised with cause
+ * java.net.SocketTimeoutException
+ *
* @throws PrivilegedActionException if an exception occurs while trying
* to open a connection.
*
@@ -84,7 +95,7 @@
* @throws StandardException if an incompatible database version is found.
*
*/
- public void initConnection() throws
+ public void initConnection(int timeout) throws
PrivilegedActionException,
IOException,
StandardException,
@@ -93,6 +104,7 @@
//Contains the ServerSocket used to listen for
//connections from the replication master.
final ServerSocket serverSocket = createServerSocket();
+ serverSocket.setSoTimeout(timeout);
//Start listening on the socket and accepting the connection
Socket client =
Modified: db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/slave/SlaveController.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/slave/SlaveController.java?rev=595900&r1=595899&r2=595900&view=diff
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/slave/SlaveController.java (original)
+++ db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/slave/SlaveController.java Fri Nov 16 18:54:30 2007
@@ -23,15 +23,23 @@
package org.apache.derby.impl.services.replication.slave;
import org.apache.derby.iapi.error.StandardException;
+import org.apache.derby.iapi.reference.MessageId;
import org.apache.derby.iapi.reference.SQLState;
import org.apache.derby.iapi.services.monitor.ModuleControl;
import org.apache.derby.iapi.services.monitor.ModuleSupportable;
+import org.apache.derby.iapi.services.monitor.Monitor;
import org.apache.derby.iapi.store.raw.RawStoreFactory;
import org.apache.derby.iapi.store.raw.log.LogFactory;
+import org.apache.derby.impl.store.raw.log.LogToFile;
+import org.apache.derby.impl.services.replication.ReplicationLogger;
+import org.apache.derby.impl.services.replication.net.ReplicationMessage;
+import org.apache.derby.impl.services.replication.net.ReplicationMessageReceive;
import org.apache.derby.iapi.services.replication.slave.SlaveFactory;
+import java.io.EOFException;
+import java.net.SocketTimeoutException;
import java.util.Properties;
/**
@@ -48,15 +56,39 @@
*
* @see SlaveFactory
*/
-public class SlaveController implements SlaveFactory, ModuleControl,
- ModuleSupportable {
+public class SlaveController extends ReplicationLogger
+ implements SlaveFactory, ModuleControl, ModuleSupportable {
+
+
+ // How long to wait for a connection to be established with the
+ // master before timing out. Note that this is done so that we can
+ // detect if replication slave mode has been stopped. If
+ // replication mode has not been stopped, a new attempt is made to
+ // set up the connection.
+ // TODO: make this configurable through a property
+ private static final int DEFAULT_SOCKET_TIMEOUT = 1000; // 1 second
private RawStoreFactory rawStoreFactory;
- private LogFactory logFactory;
- // waiting for code to go into trunk:
- // private NetworkReceive connection;
+ private LogToFile logToFile;
+ private ReplicationMessageReceive receiver;
+ private String slavehost;
private int slaveport;
+ private String dbname; // The name of the replicated database
+
+ // Whether or not replication slave mode is still on. Will be set
+ // to false when slave replication is shut down. The value of this
+ // variable is checked after every timeout when trying to set up a
+ // connection to the master, and by the thread that applies log
+ // chunks received from the master.
+ private volatile boolean inReplicationSlaveMode = true;
+
+ // Used to parse chunks of log records received from the master.
+ private ReplicationLogScan logScan;
+
+ // Thread that listens for log chunk messages from the master, and
+ // applies these to the local log
+ private SlaveLogReceiverThread logReceiverThread;
/**
* Empty constructor required by Monitor.bootServiceModule
@@ -71,8 +103,6 @@
* Used by Monitor.bootServiceModule to start the service. It will
* set up basic variables
*
- * Not implemented yet
- *
* @param create Currently ignored
* @param properties Properties used to start the service in the
* correct mode
@@ -82,15 +112,14 @@
public void boot(boolean create, Properties properties)
throws StandardException {
+ slavehost = properties.getProperty(SlaveFactory.SLAVE_HOST);
+
String port = properties.getProperty(SlaveFactory.SLAVE_PORT);
if (port != null) {
slaveport = new Integer(port).intValue();
}
- // Added when Network Service has been committed to trunk
- // connection = new NetworkReceive();
-
- System.out.println("SlaveController booted");
+ dbname = properties.getProperty(SlaveFactory.SLAVE_DB);
}
/**
@@ -137,28 +166,54 @@
/**
* Start slave replication. This method establishes a network
* connection with the associated replication master and starts a
- * daemon that applies operations received from the master (in the
+ * thread that applies operations received from the master (in the
* form of log records) to the local slave database.
*
- * Not implemented yet
- *
* @param rawStore The RawStoreFactory for the database
- * @param logFac The LogFactory ensuring recoverability for this database
+ * @param logFac The LogFactory ensuring recoverability for this
+ * database
+ *
+ * @exception StandardException Thrown if the slave could not be
+ * started.
*/
- public void startSlave(RawStoreFactory rawStore, LogFactory logFac) {
- // Added when Network Service has been committed to trunk:
- // connection.connect(); // sets up a network connection to the slave
+ public void startSlave(RawStoreFactory rawStore, LogFactory logFac)
+ throws StandardException {
+
+ // Retry to setup a connection with the master until a
+ // connection has been established or until we are no longer
+ // in replication slave mode
+ receiver = new ReplicationMessageReceive(slavehost, slaveport);
+ while (!setupConnection()) {
+ if (!inReplicationSlaveMode) {
+ // If we get here, another thread has called
+ // stopSlave() while we waited for a connection with
+ // the master. The thread shutting the slave down will
+ // clean up anything we did during setupConnection, so
+ // simply return.
+ return;
+ }
+ }
+
+ // Setup the log scan used to parse chunks of log received
+ // from the master
+ logScan = new ReplicationLogScan();
rawStoreFactory = rawStore;
- logFactory = logFac;
- // Add code that initializes replication by setting up a
- // network connection with the master, receiving the database
- // from the master, make a DaemonService for applying log
- // records etc. Repliation should be up and running when this
- // method returns.
+ try {
+ logToFile = (LogToFile)logFac;
+ } catch (ClassCastException cce) {
+ // Since there are only two implementing classes of
+ // LogFactory, the class type has to be ReadOnly if it is
+ // not LogToFile.
+ throw StandardException.newException(
+ SQLState.CANNOT_REPLICATE_READONLY_DATABASE);
+ }
+
+ logToFile.initializeReplicationSlaveRole();
+ startLogReceiverThread();
- System.out.println("SlaveController started");
+ Monitor.logTextMessage(MessageId.REPLICATION_SLAVE_STARTED, dbname);
}
/**
@@ -167,7 +222,10 @@
* Not implemented yet
*/
public void stopSlave() {
- System.out.println("SlaveController stopped");
+ inReplicationSlaveMode = false;
+
+ // todo: shutdown slave
+ Monitor.logTextMessage(MessageId.REPLICATION_SLAVE_STOPPED, dbname);
}
/**
@@ -200,7 +258,7 @@
// this database. The database can be connected to after this.
// // complete recovery of the database
- // logFactory.setReplicationMode(false);
+ // logToFile.setReplicationMode(false);
// Added when Network Service has been committed to trunk:
// connection.shutdown();
@@ -208,5 +266,216 @@
System.out.println("SlaveController failover");
}
+ ////////////////////////////////////////////////////////////
+ // Private Methods //
+ ////////////////////////////////////////////////////////////
+
+ /**
+ * Establish a connection with the replication master. Listens for
+ * a connection on the slavehost/port for DEFAULT_SOCKET_TIMEOUT
+ * milliseconds.
+ *
+ * @return true if a connection has been set up with the master,
+ * false if the connection attempt timed out.
+ *
+ * @exception StandardException if an unexpected exception occured
+ * that prevented a connection with the master.
+ */
+ private boolean setupConnection() throws StandardException {
+
+ try {
+ // timeout to check if still in replication slave mode
+ receiver.initConnection(DEFAULT_SOCKET_TIMEOUT);
+ return true; // will not reach this if timeout
+ } catch (StandardException se) {
+ throw se;
+ } catch (Exception e) {
+ // SocketTimeoutException is wrapped in
+ // PrivilegedActionException.
+ Throwable cause = e.getCause();
+ if (cause instanceof SocketTimeoutException) {
+ // Timeout!
+ return false;
+ } else {
+ throw StandardException.newException
+ (SQLState.REPLICATION_CONNECTION_EXCEPTION, e, dbname);
+ }
+ }
+ }
+
+ /**
+ * Write the reason for the lost connection to the log (derby.log)
+ * and reconnect with the master. Once the network is up and
+ * running, a new LogReceiverThread is started. The method returns
+ * without doing anything if inReplicationSlaveMode=false, which
+ * means that stopSlave() has been called by another thread.
+ *
+ * @param eofe The reason the connection to the master was lost
+ */
+
+ private void handleDisconnect(EOFException eofe) {
+ if (!inReplicationSlaveMode) {
+ return;
+ }
+
+ logError(MessageId.REPLICATION_SLAVE_LOST_CONN, eofe, dbname);
+
+ try {
+ while (!setupConnection()) {
+ if (!inReplicationSlaveMode) {
+ // stopSlave may have been called, turning
+ // replication slave mode off. Simply return if
+ // that is the case. The thread that called
+ // stopSlave will clean up everything.
+ return;
+ }
+ }
+
+ startLogReceiverThread();
+ } catch (StandardException se) {
+ handleFatalException(se);
+ }
+ }
+
+ /**
+ * Starts the LogReceiverThread that will listen for chunks of log
+ * records from the master and apply the log records to the local
+ * log file.
+ */
+ private void startLogReceiverThread() {
+ logReceiverThread = new SlaveLogReceiverThread();
+ logReceiverThread.start();
+ }
+
+ /**
+ * Handles fatal errors for slave replication functionality. These
+ * are errors that requires us to stop replication. Calling this
+ * method has the following effects:
+ *
+ * 1) Debug messages are written to the log file (usually
+ * derby.log) if ReplicationLogger#LOG_REPLICATION_MESSAGES is
+ * true.
+ *
+ * 2) If the network connection is up, the master is notified of
+ * the problem.
+ *
+ * 3) All slave replication functionality is stopped, and the
+ * database is then shut down without being booted.
+ *
+ * The method will return without doing anything if
+ * inReplicationSlaveMode=false, meaning that stopSlave has been
+ * called.
+ *
+ * @param e The fatal exception that is the reason for calling
+ * this method
+ */
+ private void handleFatalException(Exception e) {
+ // If inReplicationSlaveMode is false, the stopSlave method in
+ // this controller has already been called. If so, we ignore
+ // this fatal error.
+ if (!inReplicationSlaveMode) {
+ return;
+ }
+
+ logError(MessageId.REPLICATION_FATAL_ERROR, e, dbname);
+
+ // todo: notify master of the problem
+ // todo: rawStoreFactory.stopReplicationSlave();
+ }
+
+
+ ///////////////////////////////////////////////////////////////////////////
+ // Inner Class - Thread used to apply chunks of log received from master //
+ ///////////////////////////////////////////////////////////////////////////
+
+ /**
+ * Thread that listens for incoming messages from the master and
+ * applies chunks of log records to the local log files.
+ */
+ private class SlaveLogReceiverThread extends Thread {
+ public void run() {
+ // Debug only - println will be removed
+ System.out.println("Started log receiver thread");
+ try {
+ ReplicationMessage message;
+ while (inReplicationSlaveMode) {
+ message = receiver.readMessage();
+
+ switch (message.getType()){
+ case ReplicationMessage.TYPE_LOG:
+ byte[] logChunk = (byte[])message.getMessage();
+ handleLogChunk(logChunk);
+ break;
+ default:
+ // debug; will be removed
+ System.out.println("Not handling non-log messages yet "
+ +"- got a type "+message.getType());
+ break;
+ }
+ }
+
+ } catch (EOFException eofe) {
+ // Network connection with master has been lost.
+ handleDisconnect(eofe);
+ } catch (StandardException se) {
+ handleFatalException(se);
+ } catch (Exception e) {
+ // Exceptions not caused by disconnect are unexpected,
+ // and therefore fatal
+ StandardException se =
+ StandardException.newException
+ (SQLState.REPLICATION_UNEXPECTED_EXCEPTION, e);
+ handleFatalException(se);
+ }
+ }
+
+ /**
+ * Parses a chunk of log received from the master, and applies
+ * the individual log records to the local log file.
+ *
+ * @param logChunk A chunk of log records received from the
+ * master
+ * @exception StandardException If the chunk of log records
+ * could not be parsed or the local log file is out of synch
+ * with the master log file.
+ */
+ private void handleLogChunk(byte[] logChunk)
+ throws StandardException{
+ logScan.init(logChunk);
+
+ while (logScan.next()){
+ if (logScan.isLogFileSwitch()) {
+ logToFile.switchLogFile();
+ } else {
+
+ long localInstant = logToFile.
+ appendLogRecord(logScan.getData(),
+ 0,
+ logScan.getDataLength(),
+ null,
+ 0,
+ 0);
+
+ // If the log instant of the received log does not
+ // match with the local log instant, the log
+ // records are not written to the same physical
+ // location in the log files. This is fatal since
+ // log records are identified by their physical
+ // location in the log files.
+ if (logScan.getInstant() != localInstant) {
+ throw StandardException.newException
+ (SQLState.REPLICATION_LOG_OUT_OF_SYNCH,
+ dbname,
+ new Long(logScan.getInstant()),
+ new Long(localInstant));
+
+ }
+ }
+ }
+ }
+ }
+ ///////////////////////////////////////////////////////////
+ // END Inner Class //
+ ///////////////////////////////////////////////////////////
}
Modified: db/derby/code/trunk/java/engine/org/apache/derby/impl/store/raw/log/LogToFile.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/impl/store/raw/log/LogToFile.java?rev=595900&r1=595899&r2=595900&view=diff
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/impl/store/raw/log/LogToFile.java (original)
+++ db/derby/code/trunk/java/engine/org/apache/derby/impl/store/raw/log/LogToFile.java Fri Nov 16 18:54:30 2007
@@ -357,9 +357,9 @@
// switchLogFile (both in slave replication mode and after the
// database has been fully booted) when a new log file is
// allocated.
- long logFileNumber = -1; // current log file number
- // other than during boot and recovery time,
- // and by initializeSlaveReplication if in
+ long logFileNumber = -1; // current log file number.
+ // Other than during boot and recovery time,
+ // and during initializeReplicationSlaveRole if in
// slave replication mode,
// logFileNumber is only changed by
// switchLogFile, which is synchronized.
@@ -1904,7 +1904,7 @@
MT - log factory is single threaded thru a log file switch, the log is frozen for the duration of the switch */ - private void switchLogFile() throws StandardException + public void switchLogFile() throws StandardException { boolean switchedOver = false; @@ -2838,7 +2838,7 @@
When the database is in slave replication mode only: Assumes that only recover() will call this method after - initializeSlaveReplication() has been called, and until slave + initializeReplicationSlaveRole() has been called, and until slave replication has ended. If this changes, the current implementation will fail.
@@ -2849,25 +2849,35 @@ throws IOException, StandardException { - //