db-derby-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From oyste...@apache.org
Subject svn commit: r644716 - in /db/derby/code/trunk/java/engine/org/apache/derby/impl/store/replication/master: AsynchronousLogShipper.java MasterController.java
Date Fri, 04 Apr 2008 14:07:59 GMT
Author: oysteing
Date: Fri Apr  4 07:07:50 2008
New Revision: 644716

URL: http://svn.apache.org/viewvc?rev=644716&view=rev
Log:
DERBY-3567: Makes ALS#forceFlush time out after 5 seconds if not able to send message.
Contributed by Jorgen Loland with modifications by Oystein Grovlen

Modified:
    db/derby/code/trunk/java/engine/org/apache/derby/impl/store/replication/master/AsynchronousLogShipper.java
    db/derby/code/trunk/java/engine/org/apache/derby/impl/store/replication/master/MasterController.java

Modified: db/derby/code/trunk/java/engine/org/apache/derby/impl/store/replication/master/AsynchronousLogShipper.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/impl/store/replication/master/AsynchronousLogShipper.java?rev=644716&r1=644715&r2=644716&view=diff
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/impl/store/replication/master/AsynchronousLogShipper.java
(original)
+++ db/derby/code/trunk/java/engine/org/apache/derby/impl/store/replication/master/AsynchronousLogShipper.java
Fri Apr  4 07:07:50 2008
@@ -99,7 +99,7 @@
      * true - stop shipping log records
      * false - shipping can continue without interruption.
      */
-    private boolean stopShipping = false;
+    private volatile boolean stopShipping = false;
     
     /**
      * The master controller that initialized this log shipper.
@@ -111,6 +111,13 @@
      * is moved into the wait state, or while notifying it.
      */
     private Object objLSTSync = new Object(); // LST->Log Shippper Thread
+
+    /** Used to synchronize forceFlush calls */
+    private Object forceFlushSemaphore = new Object();
+
+    /** The number of millis a call to forceFlush will wait before giving
+     * up sending a chunk of log to the slave */
+    public static final int DEFAULT_FORCEFLUSH_TIMEOUT = 5000;
     
     /**
      * Store the log chunk that failed during a previous shipping attempt
@@ -201,6 +208,10 @@
         while (!stopShipping) {
             try {
                 shipALogChunk();
+                synchronized (forceFlushSemaphore) {
+                    // Wake up a thread waiting for forceFlush, if any
+                    forceFlushSemaphore.notify();
+                }
                 //calculate the shipping interval (wait time) based on the
                 //fill information obtained from the log buffer.
                 shippingInterval = calculateSIfromFI();
@@ -310,16 +321,20 @@
      * @throws StandardException If an exception occurs while trying to read
      *                           log records from the log buffer.
      */
-    public void forceFlush() throws IOException, StandardException {
-        if (!stopShipping) {
-            shipALogChunk();
-        }
-        
-        synchronized(objLSTSync) {
-            //There will still be more log to send after the forceFlush
-            //has sent one chunk.  Notify the log shipping thread that
-            //it is time for another send.
-            objLSTSync.notify();
+    public void forceFlush() throws IOException, StandardException 
+    {
+        if (stopShipping) return;
+        synchronized (forceFlushSemaphore) {
+            synchronized (objLSTSync) {
+                // Notify the log shipping thread that
+                // it is time for another send.
+                objLSTSync.notify();
+            }
+
+            try {
+                forceFlushSemaphore.wait(DEFAULT_FORCEFLUSH_TIMEOUT);
+            } catch (InterruptedException ex) {
+            }
         }
     }
     
@@ -348,7 +363,7 @@
     /**
      * Stop shipping log records. If a ship is currently in progress
      * it will not be interrupted, shipping will stop only after the
-     * current shippment is done.
+     * current shipment is done.
      */
     public void stopLogShipment() {
         stopShipping = true;

Modified: db/derby/code/trunk/java/engine/org/apache/derby/impl/store/replication/master/MasterController.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/impl/store/replication/master/MasterController.java?rev=644716&r1=644715&r2=644716&view=diff
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/impl/store/replication/master/MasterController.java
(original)
+++ db/derby/code/trunk/java/engine/org/apache/derby/impl/store/replication/master/MasterController.java
Fri Apr  4 07:07:50 2008
@@ -413,8 +413,14 @@
         } catch (LogBufferFullException lbfe) {
             try {
                 logShipper.forceFlush();
-                // There should now be room for this log chunk in the buffer
-                appendLog(greatestInstant, log, logOffset, logLength);
+                // Either the forceFlush succeeded in sending a chunk of log
+                // (making room for this log chunk in the buffer), or
+                // forceFlush did not succeed (in which case replication is
+                // stopped)
+                logBuffer.appendLog(greatestInstant, log,
+                                    logOffset, logLength);
+            } catch (LogBufferFullException lbfe2) {
+                printStackAndStopMaster(lbfe2);
             } catch (IOException ioe) {
                 printStackAndStopMaster(ioe);
             } catch (StandardException se) {



Mime
View raw message