From commits-return-9523-apmail-activemq-commits-archive=activemq.apache.org@activemq.apache.org Tue Nov 11 20:10:34 2008 Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 5905 invoked from network); 11 Nov 2008 20:10:34 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 11 Nov 2008 20:10:34 -0000 Received: (qmail 96143 invoked by uid 500); 11 Nov 2008 20:10:41 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 96125 invoked by uid 500); 11 Nov 2008 20:10:41 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 96116 invoked by uid 99); 11 Nov 2008 20:10:41 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Nov 2008 12:10:41 -0800 X-ASF-Spam-Status: No, hits=-1998.8 required=10.0 tests=ALL_TRUSTED,FS_REPLICA X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Nov 2008 20:09:23 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 531F0238895D; Tue, 11 Nov 2008 12:10:06 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r713149 - in /activemq/sandbox/kahadb/src: main/java/org/apache/kahadb/replication/ main/java/org/apache/kahadb/store/ main/proto/ test/java/org/apache/kahadb/replication/ Date: Tue, 11 Nov 2008 20:10:05 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20081111201006.531F0238895D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Tue Nov 11 12:10:04 2008 New Revision: 713149 URL: http://svn.apache.org/viewvc?rev=713149&view=rev Log: Added better slave synchronization handling on the master side. We now create a snapshot for each slave session and clean up the snapshot once the slave is online. Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java?rev=713149&r1=713148&r2=713149&view=diff ============================================================================== --- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java (original) +++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java Tue Nov 11 12:10:04 2008 @@ -22,6 +22,9 @@ import java.io.IOException; import java.net.URI; import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; @@ -52,16 +55,19 @@ import com.google.protobuf.ByteString; +import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger; + public class ReplicationMaster implements Service, ClusterListener, ReplicationTarget { private static final Log LOG = LogFactory.getLog(ReplicationServer.class); private final ReplicationServer replicationServer; - private Object serverMutex = new Object() { - }; + private Object serverMutex = new Object() {}; private TransportServer server; private CopyOnWriteArrayList sessions = new CopyOnWriteArrayList(); + + AtomicInteger nextSnapshotId = new AtomicInteger(); public ReplicationMaster(ReplicationServer replication1Server) { this.replicationServer = replication1Server; @@ -136,6 +142,10 @@ private final Transport transport; private final AtomicBoolean subscribedToJournalUpdates = new AtomicBoolean(); + + private File snapshotFile; + private HashSet journalReplicatedFiles; + private boolean online; public ReplicationSession(Transport transport) { this.transport = transport; @@ -147,6 +157,7 @@ } public void stop() throws Exception { + deleteReplicationData(); transport.stop(); } @@ -155,9 +166,11 @@ ReplicationFrame frame = (ReplicationFrame) command; switch (frame.getHeader().getType()) { case SLAVE_INIT: - subscribedToJournalUpdates.set(true); onSlaveInit(frame, (PBSlaveInit) frame.getPayload()); break; + case SLAVE_ONLINE: + onSlaveOnline(frame); + break; case FILE_TRANSFER: onFileTransfer(frame, (PBFileInfo) frame.getPayload()); break; @@ -187,8 +200,27 @@ public void transportResumed() { } + private void onSlaveOnline(ReplicationFrame frame) { + online = true; + deleteReplicationData(); + } + + private void deleteReplicationData() { + if( snapshotFile!=null ) { + snapshotFile.delete(); + snapshotFile=null; + } + if( journalReplicatedFiles!=null ) { + journalReplicatedFiles=null; + updateJournalReplicatedFiles(); + } + } + private void onSlaveInit(ReplicationFrame frame, PBSlaveInit slaveInit) throws Exception { + // Start sending journal updates to the slave. + subscribedToJournalUpdates.set(true); + // We could look at the slave state sent in the slaveInit and decide // that a full sync is not needed.. // but for now we will do a full sync every time. @@ -197,37 +229,78 @@ rc.setHeader(new PBHeader().setType(PBType.SLAVE_INIT_RESPONSE)); rc.setPayload(rcPayload); + // Setup a map of all the files that the slave has + final HashMap slaveFiles = new HashMap(); + for (PBFileInfo info : slaveInit.getCurrentFilesList()) { + slaveFiles.put(info.getName(), info); + } + + final KahaDBStore store = replicationServer.getStore(); store.checkpoint(new Callback() { public void execute() throws Exception { // This call back is executed once the checkpoint is - // completed and all data has been - // synced to disk, but while a lock is still held on the - // store so that no - // updates are allowed. - ArrayList infos = new ArrayList(); + // completed and all data has been synced to disk, + // but while a lock is still held on the store so + // that no updates are done while we are in this + // method. + + KahaDBStore store = replicationServer.getStore(); + int snapshotId = nextSnapshotId.incrementAndGet(); + File file = store.getPageFile().getFile(); + snapshotFile = new File(file.getParentFile(), "snapshot-" + snapshotId); + + journalReplicatedFiles = new HashSet(); + + // Store the list files associated with the snapshot. + ArrayList snapshotInfos = new ArrayList(); Map journalFiles = store.getJournal().getFileMap(); for (DataFile df : journalFiles.values()) { - infos.add(replicationServer.createInfo("journal-" + df.getDataFileId(), df.getFile(), df.getLength())); + // Look at what the slave has so that only the missing bits are transfered. + String name = "journal-" + df.getDataFileId(); + PBFileInfo slaveInfo = slaveFiles.get(name); + + // Use the checksum info to see if the slave has the file already.. Checksums are less acurrate for + // small amounts of data.. so ignore small files. + if( slaveInfo!=null && slaveInfo.getEnd()> 1024*512 ) { + // If the slave's file checksum matches what we have.. + if( replicationServer.checksum(df.getFile(), 0, slaveInfo.getEnd())==slaveInfo.getChecksum() ) { + // is Our file longer? then we need to continue transferring the rest of the file. + if( df.getLength() > slaveInfo.getEnd() ) { + snapshotInfos.add(replicationServer.createInfo(name, df.getFile(), slaveInfo.getEnd(), df.getLength())); + journalReplicatedFiles.add(df.getDataFileId()); + continue; + } else { + // No need to replicate this file. + continue; + } + } + } + + // If we got here then it means we need to transfer the whole file. + snapshotInfos.add(replicationServer.createInfo(name, df.getFile(), 0, df.getLength())); + journalReplicatedFiles.add(df.getDataFileId()); } + + PBFileInfo info = new PBFileInfo(); + info.setName("database"); + info.setSnapshotId(snapshotId); + info.setStart(0); + info.setEnd(file.length()); + info.setChecksum(copyAndChecksum(file, snapshotFile)); + snapshotInfos.add(info); - SnapshotStatus snapshot = createSnapshot(); - PBFileInfo databaseInfo = new PBFileInfo(); - databaseInfo.setName("database"); - databaseInfo.setSnapshotId(snapshot.id); - databaseInfo.setStart(0); - databaseInfo.setEnd(snapshot.size); - databaseInfo.setChecksum(snapshot.checksum); - infos.add(databaseInfo); + rcPayload.setCopyFilesList(snapshotInfos); - rcPayload.setCopyFilesList(infos); + updateJournalReplicatedFiles(); } + }); transport.oneway(rc); } - + private void onFileTransfer(ReplicationFrame frame, PBFileInfo fileInfo) throws IOException { File file = replicationServer.getReplicationFile(fileInfo.getName()); long payloadSize = fileInfo.getEnd()-fileInfo.getStart(); @@ -254,38 +327,20 @@ } - static class SlaveStatus { - String salve_id; - PBJournalLocation lastAck; - Integer syncingSnapshot; - } - - static class SnapshotStatus { - int id; - File file; - long checksum; - PBJournalLocation lastJournalLocation; - long size; - } - - - - int nextSnapshotId; - SnapshotStatus currentSnapshot; - private SnapshotStatus createSnapshot() throws IOException { - if (currentSnapshot == null) { - currentSnapshot = new SnapshotStatus(); - currentSnapshot.id = nextSnapshotId++; - KahaDBStore store = replicationServer.getStore(); - File file = store.getPageFile().getFile(); - currentSnapshot.file = new File(file.getParentFile(), "snapshot-" + currentSnapshot.id); - currentSnapshot.checksum = copyAndChecksum(file, currentSnapshot.file); - currentSnapshot.lastJournalLocation = convert(store.getJournal().getLastAppendLocation()); - currentSnapshot.size = currentSnapshot.file.length(); + /** + * Looks at all the journal files being currently replicated and informs the KahaDB so that + * it does not delete them while the replication is occuring. + */ + private void updateJournalReplicatedFiles() { + HashSet files = replicationServer.getStore().getJournalFilesBeingReplicated(); + files.clear(); + for (ReplicationSession session : sessions) { + if( session.journalReplicatedFiles !=null ) { + files.addAll(session.journalReplicatedFiles); + } } - return currentSnapshot; } - + private PBJournalLocation convert(Location loc) { if( loc==null ) { return null; @@ -313,11 +368,11 @@ } finally { try { is.close(); - } finally { + } catch(Throwable e) { } try { os.close(); - } finally { + } catch(Throwable e) { } } } Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java?rev=713149&r1=713148&r2=713149&view=diff ============================================================================== --- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java (original) +++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java Tue Nov 11 12:10:04 2008 @@ -17,8 +17,8 @@ package org.apache.kahadb.replication; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; +import java.io.RandomAccessFile; import java.util.zip.Adler32; import java.util.zip.Checksum; @@ -211,24 +211,36 @@ } } - PBFileInfo createInfo(String name, File file, long length) throws IOException { + PBFileInfo createInfo(String name, File file, long start, long length) throws IOException { PBFileInfo rc = new PBFileInfo(); rc.setName(name); - FileInputStream is = new FileInputStream(file); - byte buffer[] = new byte[1024 * 4]; - int c; - - long size = 0; - Checksum checksum = new Adler32(); - while (size < length && (c = is.read(buffer, 0, (int) Math.min(length - size, buffer.length))) >= 0) { - checksum.update(buffer, 0, c); - size += c; - } - rc.setChecksum(checksum.getValue()); - rc.setStart(0); - rc.setEnd(size); + rc.setChecksum(checksum(file, start, length)); + rc.setStart(start); + rc.setEnd(length); return rc; } + + long checksum(File file, long start, long end) throws IOException { + RandomAccessFile raf = new RandomAccessFile(file, "r"); + try { + Checksum checksum = new Adler32(); + byte buffer[] = new byte[1024 * 4]; + int c; + long pos = start; + raf.seek(start); + + while (pos < end && (c = raf.read(buffer, 0, (int) Math.min(end - pos, buffer.length))) >= 0) { + checksum.update(buffer, 0, c); + pos += c; + } + + return checksum.getValue(); + } finally { + try { raf.close(); } catch (Throwable e){} + } + } + + public boolean isMaster() { return master!=null; } Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java?rev=713149&r1=713148&r2=713149&view=diff ============================================================================== --- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java (original) +++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java Tue Nov 11 12:10:04 2008 @@ -98,11 +98,11 @@ continue; } - PBFileInfo info = replicationServer.createInfo("database", file, file.length()); + PBFileInfo info = replicationServer.createInfo("database", file, 0, file.length()); info.setSnapshotId(snapshot); infosMap.put("database", info); } else if( name.startsWith("journal-") ) { - PBFileInfo info = replicationServer.createInfo(name, file, file.length()); + PBFileInfo info = replicationServer.createInfo(name, file, 0, file.length()); infosMap.put(name, info); } } @@ -117,12 +117,12 @@ if( infosMap.containsKey(name) ) { continue; } - infosMap.put(name, replicationServer.createInfo(name, df.getFile(), df.getLength())); + infosMap.put(name, replicationServer.createInfo(name, df.getFile(), 0, df.getLength())); } if( !infosMap.containsKey("database") ) { File pageFile = store.getPageFile().getFile(); if( pageFile.exists() ) { - infosMap.put("database", replicationServer.createInfo("database", pageFile, pageFile.length())); + infosMap.put("database", replicationServer.createInfo("database", pageFile, 0, pageFile.length())); } } @@ -208,13 +208,15 @@ // Once the data has been synced.. we are going to // go into an online recovery mode... file = replicationServer.getReplicationFile(name); - onlineRecovery=true; } journalUpateFile = new RandomAccessFile(file, "rw"); journalUpdateFileId = location.getFileId(); - } + } journalUpateFile.seek(location.getOffset()); journalUpateFile.write(data); + if( !bulkSynchronizing ) { + onlineRecovery=true; + } } if( onlineRecovery ) { @@ -236,8 +238,12 @@ private void commitBulkTransfer() throws IOException { synchronized (transferMutex) { - journalUpateFile.close(); - journalUpateFile=null; + LOG.info("Slave synhcronization complete, going online..."); + + if( journalUpateFile!=null ) { + journalUpateFile.close(); + journalUpateFile=null; + } replicationServer.getStore().close(); // If we got a new snapshot of the database, then we need to @@ -258,7 +264,15 @@ bulkSynchronizing=false; replicationServer.getStore().open(); + + LOG.info("Slave is now online. We are now eligible to become the master."); } + + // Let the master know we are now online. + ReplicationFrame frame = new ReplicationFrame(); + frame.setHeader(new PBHeader().setType(PBType.SLAVE_ONLINE)); + transport.oneway(frame); + replicationServer.getStore().incrementalRecover(); } Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java?rev=713149&r1=713148&r2=713149&view=diff ============================================================================== --- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java (original) +++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java Tue Nov 11 12:10:04 2008 @@ -417,19 +417,16 @@ public void checkpoint(Callback closure) throws Exception { - try { - synchronized (indexMutex) { - pageFile.tx().execute(new Transaction.Closure() { - public void execute(Transaction tx) throws IOException { - checkpointUpdate(tx, false); - } - }); - pageFile.flush(); - closure.execute(); - } - store(new KahaTraceCommand().setMessage("CHECKPOINT " + new Date()), true); - } catch (IOException e) { + synchronized (indexMutex) { + pageFile.tx().execute(new Transaction.Closure() { + public void execute(Transaction tx) throws IOException { + checkpointUpdate(tx, false); + } + }); + pageFile.flush(); + closure.execute(); } + store(new KahaTraceCommand().setMessage("CHECKPOINT " + new Date()), true); } // ///////////////////////////////////////////////////////////////// @@ -619,6 +616,7 @@ // ///////////////////////////////////////////////////////////////// protected final Object indexMutex = new Object(); + private final HashSet journalFilesBeingReplicated = new HashSet(); private void upadateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException { StoredDestination sd = getStoredDestination(command.getDestination(), tx); @@ -775,7 +773,7 @@ }); } - + inUseFiles.addAll(journalFilesBeingReplicated); Location l = metadata.lastUpdate; if( metadata.firstInProgressTransactionLocation!=null ) { l = metadata.firstInProgressTransactionLocation; @@ -787,12 +785,17 @@ LOG.debug("Checkpoint done."); } + + public HashSet getJournalFilesBeingReplicated() { + return journalFilesBeingReplicated; + } // ///////////////////////////////////////////////////////////////// // StoredDestination related implementation methods. // ///////////////////////////////////////////////////////////////// - private final HashMap storedDestinations = new HashMap(); + + private final HashMap storedDestinations = new HashMap(); class StoredSubscription { SubscriptionInfo subscriptionInfo; Modified: activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto?rev=713149&r1=713148&r2=713149&view=diff ============================================================================== --- activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto (original) +++ activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto Tue Nov 11 12:10:04 2008 @@ -42,14 +42,19 @@ // @followed-by PBSlaveInitResponse SLAVE_INIT_RESPONSE = 1; + // The Slave will send this this command to the master once he has completed + // all his bulk synchronizations and he is ready to take over as being a master. + // + // @followed-by null + SLAVE_ONLINE=2; + // Sent from the Master to the slave to replicate a Journal update. // // @followed-by PBJournalUpdate JOURNAL_UPDATE=3; - // An ack sent from the Slave to a master to let the master know up to where in the journal the slave has - // synchronized to. This acknowledges receipt of all previous journal records. This should not be sent until - // all bulk file copies are complete. + // An ack sent back to the master in response to to a received + // JOURNAL_UPDATE // // @followed-by PBJournalLocation JOURNAL_UPDATE_ACK=4; @@ -91,9 +96,15 @@ // The files that the slave should delete repeated string delete_files=2; } - message PBJournalUpdate { + // Journal location of the update. required PBJournalLocation location=1; + // The data that will be written at that location. required bytes data=2; + // Should the slave send back an ack for this update. + optional bool send_ack=3; + // If true, then the slave should do a disk sync before returning a + // JOURNAL_UPDATE_ACK + optional bool disk_sync=4; } Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java?rev=713149&r1=713148&r2=713149&view=diff ============================================================================== --- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java (original) +++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java Tue Nov 11 12:10:04 2008 @@ -21,7 +21,6 @@ import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; -import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; @@ -77,7 +76,7 @@ cluster.setClusterState(clusterState); try { - sendMesagesTo(100, BROKER1_URI); + sendMesagesTo(BROKER1_URI, 100, "Pass 1: "); } catch( JMSException e ) { fail("b1 did not become a master."); } @@ -93,12 +92,12 @@ try { - sendMesagesTo(100, BROKER1_URI); + sendMesagesTo(BROKER1_URI, 100, "Pass 2: "); } catch( JMSException e ) { fail("Failed to send more messages..."); } - Thread.sleep(1000); + Thread.sleep(2000); // Make broker 2 the master. clusterState = new ClusterState(); @@ -133,14 +132,14 @@ } } - private void sendMesagesTo(int count, String brokerUri) throws JMSException { + private void sendMesagesTo(String brokerUri, int count, String msg) throws JMSException { ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUri); Connection con = cf.createConnection(); try { Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer producer = session.createProducer(destination); for (int i = 0; i < count; i++) { - producer.send(session.createTextMessage("Hello: "+i)); + producer.send(session.createTextMessage(msg+i)); } } finally { try { con.close(); } catch (Throwable e) {}