Author: oysteing Date: Fri Apr 4 07:07:50 2008 New Revision: 644716 URL: http://svn.apache.org/viewvc?rev=644716&view=rev Log: DERBY-3567: Makes ALS#forceFlush time out after 5 seconds if not able to send message. Contributed by Jorgen Loland with modifications by Oystein Grovlen Modified: db/derby/code/trunk/java/engine/org/apache/derby/impl/store/replication/master/AsynchronousLogShipper.java db/derby/code/trunk/java/engine/org/apache/derby/impl/store/replication/master/MasterController.java Modified: db/derby/code/trunk/java/engine/org/apache/derby/impl/store/replication/master/AsynchronousLogShipper.java URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/impl/store/replication/master/AsynchronousLogShipper.java?rev=644716&r1=644715&r2=644716&view=diff ============================================================================== --- db/derby/code/trunk/java/engine/org/apache/derby/impl/store/replication/master/AsynchronousLogShipper.java (original) +++ db/derby/code/trunk/java/engine/org/apache/derby/impl/store/replication/master/AsynchronousLogShipper.java Fri Apr 4 07:07:50 2008 @@ -99,7 +99,7 @@ * true - stop shipping log records * false - shipping can continue without interruption. */ - private boolean stopShipping = false; + private volatile boolean stopShipping = false; /** * The master controller that initialized this log shipper. @@ -111,6 +111,13 @@ * is moved into the wait state, or while notifying it. */ private Object objLSTSync = new Object(); // LST->Log Shippper Thread + + /** Used to synchronize forceFlush calls */ + private Object forceFlushSemaphore = new Object(); + + /** The number of millis a call to forceFlush will wait before giving + * up sending a chunk of log to the slave */ + public static final int DEFAULT_FORCEFLUSH_TIMEOUT = 5000; /** * Store the log chunk that failed during a previous shipping attempt @@ -201,6 +208,10 @@ while (!stopShipping) { try { shipALogChunk(); + synchronized (forceFlushSemaphore) { + // Wake up a thread waiting for forceFlush, if any + forceFlushSemaphore.notify(); + } //calculate the shipping interval (wait time) based on the //fill information obtained from the log buffer. shippingInterval = calculateSIfromFI(); @@ -310,16 +321,20 @@ * @throws StandardException If an exception occurs while trying to read * log records from the log buffer. */ - public void forceFlush() throws IOException, StandardException { - if (!stopShipping) { - shipALogChunk(); - } - - synchronized(objLSTSync) { - //There will still be more log to send after the forceFlush - //has sent one chunk. Notify the log shipping thread that - //it is time for another send. - objLSTSync.notify(); + public void forceFlush() throws IOException, StandardException + { + if (stopShipping) return; + synchronized (forceFlushSemaphore) { + synchronized (objLSTSync) { + // Notify the log shipping thread that + // it is time for another send. + objLSTSync.notify(); + } + + try { + forceFlushSemaphore.wait(DEFAULT_FORCEFLUSH_TIMEOUT); + } catch (InterruptedException ex) { + } } } @@ -348,7 +363,7 @@ /** * Stop shipping log records. If a ship is currently in progress * it will not be interrupted, shipping will stop only after the - * current shippment is done. + * current shipment is done. */ public void stopLogShipment() { stopShipping = true; Modified: db/derby/code/trunk/java/engine/org/apache/derby/impl/store/replication/master/MasterController.java URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/impl/store/replication/master/MasterController.java?rev=644716&r1=644715&r2=644716&view=diff ============================================================================== --- db/derby/code/trunk/java/engine/org/apache/derby/impl/store/replication/master/MasterController.java (original) +++ db/derby/code/trunk/java/engine/org/apache/derby/impl/store/replication/master/MasterController.java Fri Apr 4 07:07:50 2008 @@ -413,8 +413,14 @@ } catch (LogBufferFullException lbfe) { try { logShipper.forceFlush(); - // There should now be room for this log chunk in the buffer - appendLog(greatestInstant, log, logOffset, logLength); + // Either the forceFlush succeeded in sending a chunk of log + // (making room for this log chunk in the buffer), or + // forceFlush did not succeed (in which case replication is + // stopped) + logBuffer.appendLog(greatestInstant, log, + logOffset, logLength); + } catch (LogBufferFullException lbfe2) { + printStackAndStopMaster(lbfe2); } catch (IOException ioe) { printStackAndStopMaster(ioe); } catch (StandardException se) {