activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r720234 - in /activemq/trunk/kahadb/src: main/java/org/apache/kahadb/replication/ main/java/org/apache/kahadb/replication/zk/ test/java/org/apache/kahadb/replication/ test/resources/broker1/ test/resources/broker2/
Date Mon, 24 Nov 2008 17:44:13 GMT
Author: chirino
Date: Mon Nov 24 09:44:12 2008
New Revision: 720234

URL: http://svn.apache.org/viewvc?rev=720234&view=rev
Log:
- Switched to using camel case in the xml emlement names to make things consistent.
- Replaced the asyncReplication property in the ReplicationService with a minimumReplicas
properties.  When set to 0, async replication will be in effect.
- Also removed the use of a map to track replication requests since at most, only 1 sync requrest
is issued at a time.


Modified:
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationBrokerService.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationService.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManager.java
    activemq/trunk/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
    activemq/trunk/kahadb/src/test/resources/broker1/ha.xml
    activemq/trunk/kahadb/src/test/resources/broker2/ha.xml

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationBrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationBrokerService.java?rev=720234&r1=720233&r2=720234&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationBrokerService.java
(original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationBrokerService.java
Mon Nov 24 09:44:12 2008
@@ -26,7 +26,7 @@
  * he will create the actual BrokerService
  * 
  * @author chirino
- * @org.apache.xbean.XBean element="kahadb-replication-broker"
+ * @org.apache.xbean.XBean element="kahadbReplicationBroker"
  */
 public class ReplicationBrokerService extends BrokerService {
 

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java?rev=720234&r1=720233&r2=720234&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
(original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
Mon Nov 24 09:44:12 2008
@@ -23,9 +23,10 @@
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -51,58 +52,61 @@
 import org.apache.kahadb.store.KahaDBStore;
 import org.apache.kahadb.util.ByteSequence;
 
-import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
-import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
 
 public class ReplicationMaster implements Service, ClusterListener, ReplicationTarget {
 
-	private static final Log LOG = LogFactory.getLog(ReplicationService.class);
+    private static final Log LOG = LogFactory.getLog(ReplicationService.class);
 
-	private final ReplicationService replicationService;
+    private final ReplicationService replicationService;
+
+    private Object serverMutex = new Object() {
+    };
+    private TransportServer server;
+
+    private ArrayList<ReplicationSession> sessions = new ArrayList<ReplicationSession>();
+
+    private final AtomicInteger nextSnapshotId = new AtomicInteger();
+    private final Object requestMutex = new Object(){};
+    private Location requestLocation;
+    private CountDownLatch requestLatch;
+    private int minimumReplicas;
+    
+    public ReplicationMaster(ReplicationService replicationService) {
+        this.replicationService = replicationService;
+        minimumReplicas = replicationService.getMinimumReplicas();
+    }
+
+    public void start() throws Exception {
+        synchronized (serverMutex) {
+            server = TransportFactory.bind(new URI(replicationService.getUri()));
+            server.setAcceptListener(new TransportAcceptListener() {
+                public void onAccept(Transport transport) {
+                    try {
+                        synchronized (serverMutex) {
+                            ReplicationSession session = new ReplicationSession(transport);
+                            session.start();
+                            addSession(session);
+                        }
+                    } catch (Exception e) {
+                        LOG.info("Could not accept replication connection from slave at "
+ transport.getRemoteAddress() + ", due to: " + e, e);
+                    }
+                }
+
+                public void onAcceptError(Exception e) {
+                    LOG.info("Could not accept replication connection: " + e, e);
+                }
+            });
+            server.start();
+        }
+        replicationService.getStore().getJournal().setReplicationTarget(this);
+    }
 
-	private Object serverMutex = new Object() {};
-	private TransportServer server;
-	
-	private ArrayList<ReplicationSession> sessions = new ArrayList<ReplicationSession>();
-	
-	private final AtomicInteger nextSnapshotId = new AtomicInteger();
-    private final Map<Location, CountDownLatch> requestMap = new LinkedHashMap<Location,
CountDownLatch>();
-
-	public ReplicationMaster(ReplicationService replicationService) {
-		this.replicationService = replicationService;
-	}
-
-	public void start() throws Exception {
-		synchronized (serverMutex) {
-			server = TransportFactory.bind(new URI(replicationService.getUri()));
-			server.setAcceptListener(new TransportAcceptListener() {
-				public void onAccept(Transport transport) {
-					try {
-						synchronized (serverMutex) {
-							ReplicationSession session = new ReplicationSession(transport);
-							session.start();
-							addSession(session);
-						}
-					} catch (Exception e) {
-						LOG.info("Could not accept replication connection from slave at " + transport.getRemoteAddress()
+ ", due to: " + e, e);
-					}
-				}
-
-				public void onAcceptError(Exception e) {
-					LOG.info("Could not accept replication connection: " + e, e);
-				}
-			});
-			server.start();
-		}
-		replicationService.getStore().getJournal().setReplicationTarget(this);
-	}
-	
     boolean isStarted() {
         synchronized (serverMutex) {
-            return server!=null;
+            return server != null;
         }
     }
-    
+
     public void stop() throws Exception {
         replicationService.getStore().getJournal().setReplicationTarget(null);
         synchronized (serverMutex) {
@@ -111,24 +115,24 @@
                 server = null;
             }
         }
-        
+
         ArrayList<ReplicationSession> sessionsSnapshot;
         synchronized (this.sessions) {
             sessionsSnapshot = this.sessions;
         }
-        
-        for (ReplicationSession session: sessionsSnapshot) {
+
+        for (ReplicationSession session : sessionsSnapshot) {
             session.stop();
         }
     }
 
-	protected void addSession(ReplicationSession session) {
-	    synchronized (sessions) {
-	        sessions = new ArrayList<ReplicationSession>(sessions);
-	        sessions.add(session);
+    protected void addSession(ReplicationSession session) {
+        synchronized (sessions) {
+            sessions = new ArrayList<ReplicationSession>(sessions);
+            sessions.add(session);
         }
     }
-	
+
     protected void removeSession(ReplicationSession session) {
         synchronized (sessions) {
             sessions = new ArrayList<ReplicationSession>(sessions);
@@ -136,352 +140,348 @@
         }
     }
 
-	public void onClusterChange(ClusterState config) {
-		// For now, we don't really care about changes in the slave config..
-	}
-
-	/**
-	 * This is called by the Journal so that we can replicate the update to the 
-	 * slaves.
-	 */
-	public void replicate(Location location, ByteSequence sequence, boolean sync) {
-	    ArrayList<ReplicationSession> sessionsSnapshot;
+    public void onClusterChange(ClusterState config) {
+        // For now, we don't really care about changes in the slave config..
+    }
+
+    /**
+     * This is called by the Journal so that we can replicate the update to the
+     * slaves.
+     */
+    public void replicate(Location location, ByteSequence sequence, boolean sync) {
+        ArrayList<ReplicationSession> sessionsSnapshot;
         synchronized (this.sessions) {
             // Hurrah for copy on write..
             sessionsSnapshot = this.sessions;
         }
-	    
 
-        // We may be configured to always do async replication..
-		if ( replicationService.isAsyncReplication() ) {
-		    sync=false;
-		}
-		CountDownLatch latch=null;
-		if( sync ) {
-    		latch = new CountDownLatch(1);
-            synchronized (requestMap) {
-                requestMap.put(location, latch);
-            }
-		}
-		
-		ReplicationFrame frame=null;
-		for (ReplicationSession session : sessionsSnapshot) {
-			if( session.subscribedToJournalUpdates.get() ) {
-			    
-			    // Lazy create the frame since we may have not avilable sessions to send to.
-			    if( frame == null ) {
-    		        frame = new ReplicationFrame();
+        // We may be able to always async replicate...
+        if (minimumReplicas==0) {
+            sync = false;
+        }
+        CountDownLatch latch = null;
+        if (sync) {
+            latch = new CountDownLatch(minimumReplicas);
+            synchronized (requestMutex) {
+                requestLatch = latch;
+                requestLocation = location;
+            }
+        }
+
+        ReplicationFrame frame = null;
+        for (ReplicationSession session : sessionsSnapshot) {
+            if (session.subscribedToJournalUpdates.get()) {
+
+                // Lazy create the frame since we may have not avilable sessions
+                // to send to.
+                if (frame == null) {
+                    frame = new ReplicationFrame();
                     frame.setHeader(new PBHeader().setType(PBType.JOURNAL_UPDATE));
                     PBJournalUpdate payload = new PBJournalUpdate();
                     payload.setLocation(ReplicationSupport.convert(location));
                     payload.setData(new org.apache.activemq.protobuf.Buffer(sequence.getData(),
sequence.getOffset(), sequence.getLength()));
                     payload.setSendAck(sync);
                     frame.setPayload(payload);
-			    }
+                }
+
+                // TODO: use async send threads so that the frames can be pushed
+                // out in parallel.
+                try {
+                    session.setLastUpdateLocation(location);
+                    session.transport.oneway(frame);
+                } catch (IOException e) {
+                    session.onException(e);
+                }
+            }
+        }
 
-				// TODO: use async send threads so that the frames can be pushed out in parallel. 
-				try {
-				    session.setLastUpdateLocation(location);
-					session.transport.oneway(frame);
-				} catch (IOException e) {
-					session.onException(e);
-				}
-			}
-		}
-		
         if (sync) {
             try {
                 int timeout = 500;
-                int counter=0;
-                while( true ) {
-                    if( latch.await(timeout, TimeUnit.MILLISECONDS) ) {
-                        synchronized (requestMap) {
-                            requestMap.remove(location);
-                        }
+                int counter = 0;
+                while (true) {
+                    if (latch.await(timeout, TimeUnit.MILLISECONDS)) {
                         return;
                     }
-                    if( !isStarted() ) {
+                    if (!isStarted()) {
                         return;
                     }
                     counter++;
-                    if( (counter%10)==0 ) {
-                        LOG.warn("KahaDB is waiting for slave to come online. "+(timeout*counter/1000.f)+"
seconds have elapsed.");
+                    if ((counter % 10) == 0) {
+                        LOG.warn("KahaDB is waiting for slave to come online. " + (timeout
* counter / 1000.f) + " seconds have elapsed.");
                     }
-                } 
+                }
             } catch (InterruptedException ignore) {
             }
         }
-		
-	}
-	
+
+    }
+
     private void ackAllFromTo(Location lastAck, Location newAck) {
-        if ( replicationService.isAsyncReplication() ) {
+        Location l;
+        java.util.concurrent.CountDownLatch latch;
+        synchronized (requestMutex) {
+            latch = requestLatch;
+            l = requestLocation;
+        }
+        if( l == null ) {
             return;
         }
         
-        ArrayList<Entry<Location, CountDownLatch>> entries;
-        synchronized (requestMap) {
-            entries = new ArrayList<Entry<Location, CountDownLatch>>(requestMap.entrySet());
-        }
-        boolean inRange=false;
-        for (Entry<Location, CountDownLatch> entry : entries) {
-            Location l = entry.getKey();
-            if( !inRange ) {
-                if( lastAck==null || lastAck.compareTo(l) < 0 ) {
-                    inRange=true;
-                }
-            }
-            if( inRange ) {
-                entry.getValue().countDown();
-                if( newAck!=null && l.compareTo(newAck) <= 0 ) {
-                    return;
-                }
+        if (lastAck == null || lastAck.compareTo(l) < 0) {
+            if (newAck != null && l.compareTo(newAck) <= 0) {
+                latch.countDown();
+                return;
             }
-        }
+        } 
     }
 
+    class ReplicationSession implements Service, TransportListener {
 
-	class ReplicationSession implements Service, TransportListener {
-
-		private final Transport transport;
-		private final AtomicBoolean subscribedToJournalUpdates = new AtomicBoolean();
+        private final Transport transport;
+        private final AtomicBoolean subscribedToJournalUpdates = new AtomicBoolean();
         private boolean stopped;
-		
-		private File snapshotFile;
-		private HashSet<Integer> journalReplicatedFiles;
-		private Location lastAckLocation;
+
+        private File snapshotFile;
+        private HashSet<Integer> journalReplicatedFiles;
+        private Location lastAckLocation;
         private Location lastUpdateLocation;
         private boolean online;
 
-		public ReplicationSession(Transport transport) {
-			this.transport = transport;
-		}
+        public ReplicationSession(Transport transport) {
+            this.transport = transport;
+        }
 
-		synchronized public void setLastUpdateLocation(Location lastUpdateLocation) {
+        synchronized public void setLastUpdateLocation(Location lastUpdateLocation) {
             this.lastUpdateLocation = lastUpdateLocation;
         }
 
         public void start() throws Exception {
-			transport.setTransportListener(this);
-			transport.start();
-		}
+            transport.setTransportListener(this);
+            transport.start();
+        }
 
         synchronized public void stop() throws Exception {
-		    if ( !stopped  ) { 
-		        stopped=true;
-    			deleteReplicationData();
-    			transport.stop();
-		    }
-		}
+            if (!stopped) {
+                stopped = true;
+                deleteReplicationData();
+                transport.stop();
+            }
+        }
 
-		synchronized private void onJournalUpdateAck(ReplicationFrame frame, PBJournalLocation
location) {
+        synchronized private void onJournalUpdateAck(ReplicationFrame frame, PBJournalLocation
location) {
             Location ack = ReplicationSupport.convert(location);
-		    if( online ) {
+            if (online) {
                 ackAllFromTo(lastAckLocation, ack);
-		    }
-            lastAckLocation=ack;
-	    }
-		
-		synchronized private void onSlaveOnline(ReplicationFrame frame) {
+            }
+            lastAckLocation = ack;
+        }
+
+        synchronized private void onSlaveOnline(ReplicationFrame frame) {
             deleteReplicationData();
-            online  = true;
-            if( lastAckLocation!=null ) {
+            online = true;
+            if (lastAckLocation != null) {
                 ackAllFromTo(null, lastAckLocation);
             }
-            
+
         }
 
         public void onCommand(Object command) {
-			try {
-				ReplicationFrame frame = (ReplicationFrame) command;
-				switch (frame.getHeader().getType()) {
-				case SLAVE_INIT:
-					onSlaveInit(frame, (PBSlaveInit) frame.getPayload());
-					break;
-				case SLAVE_ONLINE:
-					onSlaveOnline(frame);
-					break;
-				case FILE_TRANSFER:
-					onFileTransfer(frame, (PBFileInfo) frame.getPayload());
-					break;
-				case JOURNAL_UPDATE_ACK:
-					onJournalUpdateAck(frame, (PBJournalLocation) frame.getPayload());
-					break;
-				}
-			} catch (Exception e) {
-				LOG.warn("Slave request failed: "+e, e);
-				failed(e);
-			}
-		}
-
-		public void onException(IOException error) {
-			failed(error);
-		}
-
-		public void failed(Exception error) {
-			try {
-				stop();
-			} catch (Exception ignore) {
-			}
-		}
-
-		public void transportInterupted() {
-		}
-		public void transportResumed() {
-		}
-		
-		private void deleteReplicationData() {
-			if( snapshotFile!=null ) {
-				snapshotFile.delete();
-				snapshotFile=null;
-			}
-			if( journalReplicatedFiles!=null ) {
-				journalReplicatedFiles=null;
-				updateJournalReplicatedFiles();
-			}
-		}
-
-		private void onSlaveInit(ReplicationFrame frame, PBSlaveInit slaveInit) throws Exception
{
-
-			// Start sending journal updates to the slave.
-			subscribedToJournalUpdates.set(true);
-
-			// We could look at the slave state sent in the slaveInit and decide
-			// that a full sync is not needed..
-			// but for now we will do a full sync every time.
-			ReplicationFrame rc = new ReplicationFrame();
-			final PBSlaveInitResponse rcPayload = new PBSlaveInitResponse();
-			rc.setHeader(new PBHeader().setType(PBType.SLAVE_INIT_RESPONSE));
-			rc.setPayload(rcPayload);
-			
-			// Setup a map of all the files that the slave has
-			final HashMap<String, PBFileInfo> slaveFiles = new HashMap<String, PBFileInfo>();
-			for (PBFileInfo info : slaveInit.getCurrentFilesList()) {
-				slaveFiles.put(info.getName(), info);
-			}
-			
-			
-			final KahaDBStore store = replicationService.getStore();
-			store.checkpoint(new Callback() {
-				public void execute() throws Exception {
-					// This call back is executed once the checkpoint is
-					// completed and all data has been synced to disk, 
-					// but while a lock is still held on the store so 
-					// that no updates are done while we are in this
-					// method.
-					
-					KahaDBStore store = replicationService.getStore();
-					if( lastAckLocation==null ) {
-					    lastAckLocation = store.getLastUpdatePosition();
-					}
-					
-					int snapshotId = nextSnapshotId.incrementAndGet();
-					File file = store.getPageFile().getFile();
-					File dir = replicationService.getTempReplicationDir();
-					dir.mkdirs();
-					snapshotFile = new File(dir, "snapshot-" + snapshotId);
-					
-					journalReplicatedFiles = new HashSet<Integer>();
-					
-					// Store the list files associated with the snapshot.
-					ArrayList<PBFileInfo> snapshotInfos = new ArrayList<PBFileInfo>();
-					Map<Integer, DataFile> journalFiles = store.getJournal().getFileMap();
-					for (DataFile df : journalFiles.values()) {
-						// Look at what the slave has so that only the missing bits are transfered.
-						String name = "journal-" + df.getDataFileId();
-						PBFileInfo slaveInfo = slaveFiles.remove(name);
-						
-						// Use the checksum info to see if the slave has the file already.. Checksums are less
acurrate for
-						// small amounts of data.. so ignore small files.
-						if( slaveInfo!=null && slaveInfo.getEnd()> 1024*512 ) {
-							// If the slave's file checksum matches what we have..
-							if( ReplicationSupport.checksum(df.getFile(), 0, slaveInfo.getEnd())==slaveInfo.getChecksum()
) {
-								// is Our file longer? then we need to continue transferring the rest of the file.
-								if( df.getLength() > slaveInfo.getEnd() ) {
-									snapshotInfos.add(ReplicationSupport.createInfo(name, df.getFile(), slaveInfo.getEnd(),
df.getLength()));
-									journalReplicatedFiles.add(df.getDataFileId());
-									continue;
-								} else {
-									// No need to replicate this file.
-									continue;
-								}
-							} 
-						}
-						
-						// If we got here then it means we need to transfer the whole file.
-						snapshotInfos.add(ReplicationSupport.createInfo(name, df.getFile(), 0, df.getLength()));
-						journalReplicatedFiles.add(df.getDataFileId());
-					}
-
-					PBFileInfo info = new PBFileInfo();
-					info.setName("database");
-					info.setSnapshotId(snapshotId);
-					info.setStart(0);
-					info.setEnd(file.length());
-					info.setChecksum(ReplicationSupport.copyAndChecksum(file, snapshotFile));
-					snapshotInfos.add(info);
-					
-					rcPayload.setCopyFilesList(snapshotInfos);
-					ArrayList<String> deleteFiles = new ArrayList<String>();
-					slaveFiles.remove("database");
-					for (PBFileInfo unused : slaveFiles.values()) {
-						deleteFiles.add(unused.getName());
-					}
-					rcPayload.setDeleteFilesList(deleteFiles);
-					
-					updateJournalReplicatedFiles();
-				}
-
-			});
-			
-			transport.oneway(rc);
-		}
-		
-		private void onFileTransfer(ReplicationFrame frame, PBFileInfo fileInfo) throws IOException
{
-			File file = replicationService.getReplicationFile(fileInfo.getName());
-			long payloadSize = fileInfo.getEnd()-fileInfo.getStart();
-			
-			if( file.length() < fileInfo.getStart()+payloadSize ) {
-				throw new IOException("Requested replication file dose not have enough data.");
-			}		
-			
-			ReplicationFrame rc = new ReplicationFrame();
-			rc.setHeader(new PBHeader().setType(PBType.FILE_TRANSFER_RESPONSE).setPayloadSize(payloadSize));
-			
-			FileInputStream is = new FileInputStream(file);
-			rc.setPayload(is);
-			try {
-				is.skip(fileInfo.getStart());
-				transport.oneway(rc);
-			} finally {
-				try {
-					is.close();
-				} catch (Throwable e) {
-				}
-			}
-		}
-
-	}
-
-	/**
-	 * Looks at all the journal files being currently replicated and informs the KahaDB so that
-	 * it does not delete them while the replication is occuring.
-	 */
-	private void updateJournalReplicatedFiles() {
-		HashSet<Integer>  files = replicationService.getStore().getJournalFilesBeingReplicated();
-		files.clear();
+            try {
+                ReplicationFrame frame = (ReplicationFrame)command;
+                switch (frame.getHeader().getType()) {
+                case SLAVE_INIT:
+                    onSlaveInit(frame, (PBSlaveInit)frame.getPayload());
+                    break;
+                case SLAVE_ONLINE:
+                    onSlaveOnline(frame);
+                    break;
+                case FILE_TRANSFER:
+                    onFileTransfer(frame, (PBFileInfo)frame.getPayload());
+                    break;
+                case JOURNAL_UPDATE_ACK:
+                    onJournalUpdateAck(frame, (PBJournalLocation)frame.getPayload());
+                    break;
+                }
+            } catch (Exception e) {
+                LOG.warn("Slave request failed: " + e, e);
+                failed(e);
+            }
+        }
+
+        public void onException(IOException error) {
+            failed(error);
+        }
+
+        public void failed(Exception error) {
+            try {
+                stop();
+            } catch (Exception ignore) {
+            }
+        }
+
+        public void transportInterupted() {
+        }
+
+        public void transportResumed() {
+        }
+
+        private void deleteReplicationData() {
+            if (snapshotFile != null) {
+                snapshotFile.delete();
+                snapshotFile = null;
+            }
+            if (journalReplicatedFiles != null) {
+                journalReplicatedFiles = null;
+                updateJournalReplicatedFiles();
+            }
+        }
+
+        private void onSlaveInit(ReplicationFrame frame, PBSlaveInit slaveInit) throws Exception
{
+
+            // Start sending journal updates to the slave.
+            subscribedToJournalUpdates.set(true);
+
+            // We could look at the slave state sent in the slaveInit and decide
+            // that a full sync is not needed..
+            // but for now we will do a full sync every time.
+            ReplicationFrame rc = new ReplicationFrame();
+            final PBSlaveInitResponse rcPayload = new PBSlaveInitResponse();
+            rc.setHeader(new PBHeader().setType(PBType.SLAVE_INIT_RESPONSE));
+            rc.setPayload(rcPayload);
+
+            // Setup a map of all the files that the slave has
+            final HashMap<String, PBFileInfo> slaveFiles = new HashMap<String, PBFileInfo>();
+            for (PBFileInfo info : slaveInit.getCurrentFilesList()) {
+                slaveFiles.put(info.getName(), info);
+            }
+
+            final KahaDBStore store = replicationService.getStore();
+            store.checkpoint(new Callback() {
+                public void execute() throws Exception {
+                    // This call back is executed once the checkpoint is
+                    // completed and all data has been synced to disk,
+                    // but while a lock is still held on the store so
+                    // that no updates are done while we are in this
+                    // method.
+
+                    KahaDBStore store = replicationService.getStore();
+                    if (lastAckLocation == null) {
+                        lastAckLocation = store.getLastUpdatePosition();
+                    }
+
+                    int snapshotId = nextSnapshotId.incrementAndGet();
+                    File file = store.getPageFile().getFile();
+                    File dir = replicationService.getTempReplicationDir();
+                    dir.mkdirs();
+                    snapshotFile = new File(dir, "snapshot-" + snapshotId);
+
+                    journalReplicatedFiles = new HashSet<Integer>();
+
+                    // Store the list files associated with the snapshot.
+                    ArrayList<PBFileInfo> snapshotInfos = new ArrayList<PBFileInfo>();
+                    Map<Integer, DataFile> journalFiles = store.getJournal().getFileMap();
+                    for (DataFile df : journalFiles.values()) {
+                        // Look at what the slave has so that only the missing
+                        // bits are transfered.
+                        String name = "journal-" + df.getDataFileId();
+                        PBFileInfo slaveInfo = slaveFiles.remove(name);
+
+                        // Use the checksum info to see if the slave has the
+                        // file already.. Checksums are less acurrate for
+                        // small amounts of data.. so ignore small files.
+                        if (slaveInfo != null && slaveInfo.getEnd() > 1024 * 512)
{
+                            // If the slave's file checksum matches what we
+                            // have..
+                            if (ReplicationSupport.checksum(df.getFile(), 0, slaveInfo.getEnd())
== slaveInfo.getChecksum()) {
+                                // is Our file longer? then we need to continue
+                                // transferring the rest of the file.
+                                if (df.getLength() > slaveInfo.getEnd()) {
+                                    snapshotInfos.add(ReplicationSupport.createInfo(name,
df.getFile(), slaveInfo.getEnd(), df.getLength()));
+                                    journalReplicatedFiles.add(df.getDataFileId());
+                                    continue;
+                                } else {
+                                    // No need to replicate this file.
+                                    continue;
+                                }
+                            }
+                        }
+
+                        // If we got here then it means we need to transfer the
+                        // whole file.
+                        snapshotInfos.add(ReplicationSupport.createInfo(name, df.getFile(),
0, df.getLength()));
+                        journalReplicatedFiles.add(df.getDataFileId());
+                    }
+
+                    PBFileInfo info = new PBFileInfo();
+                    info.setName("database");
+                    info.setSnapshotId(snapshotId);
+                    info.setStart(0);
+                    info.setEnd(file.length());
+                    info.setChecksum(ReplicationSupport.copyAndChecksum(file, snapshotFile));
+                    snapshotInfos.add(info);
+
+                    rcPayload.setCopyFilesList(snapshotInfos);
+                    ArrayList<String> deleteFiles = new ArrayList<String>();
+                    slaveFiles.remove("database");
+                    for (PBFileInfo unused : slaveFiles.values()) {
+                        deleteFiles.add(unused.getName());
+                    }
+                    rcPayload.setDeleteFilesList(deleteFiles);
+
+                    updateJournalReplicatedFiles();
+                }
+
+            });
+
+            transport.oneway(rc);
+        }
+
+        private void onFileTransfer(ReplicationFrame frame, PBFileInfo fileInfo) throws IOException
{
+            File file = replicationService.getReplicationFile(fileInfo.getName());
+            long payloadSize = fileInfo.getEnd() - fileInfo.getStart();
+
+            if (file.length() < fileInfo.getStart() + payloadSize) {
+                throw new IOException("Requested replication file dose not have enough data.");
+            }
+
+            ReplicationFrame rc = new ReplicationFrame();
+            rc.setHeader(new PBHeader().setType(PBType.FILE_TRANSFER_RESPONSE).setPayloadSize(payloadSize));
+
+            FileInputStream is = new FileInputStream(file);
+            rc.setPayload(is);
+            try {
+                is.skip(fileInfo.getStart());
+                transport.oneway(rc);
+            } finally {
+                try {
+                    is.close();
+                } catch (Throwable e) {
+                }
+            }
+        }
+
+    }
+
+    /**
+     * Looks at all the journal files being currently replicated and informs the
+     * KahaDB so that it does not delete them while the replication is occuring.
+     */
+    private void updateJournalReplicatedFiles() {
+        HashSet<Integer> files = replicationService.getStore().getJournalFilesBeingReplicated();
+        files.clear();
 
         ArrayList<ReplicationSession> sessionsSnapshot;
         synchronized (this.sessions) {
             // Hurrah for copy on write..
             sessionsSnapshot = this.sessions;
         }
-        
-		for (ReplicationSession session : sessionsSnapshot) {
-			if( session.journalReplicatedFiles !=null ) {
-				files.addAll(session.journalReplicatedFiles);
-			}
-		}
-	}
-	
+
+        for (ReplicationSession session : sessionsSnapshot) {
+            if (session.journalReplicatedFiles != null) {
+                files.addAll(session.journalReplicatedFiles);
+            }
+        }
+    }
+
 }

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationService.java?rev=720234&r1=720233&r2=720234&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationService.java
(original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationService.java
Mon Nov 24 09:44:12 2008
@@ -34,7 +34,7 @@
  * slave or master facets of the broker.
  * 
  * @author chirino
- * @org.apache.xbean.XBean element="kahadb-replication"
+ * @org.apache.xbean.XBean element="kahadbReplication"
  */
 public class ReplicationService implements Service, ClusterListener {
 
@@ -47,7 +47,7 @@
     private File tempReplicationDir;
     private String uri;
     private ClusterStateManager cluster;
-    private boolean asyncReplication=false;
+    private int minimumReplicas=1;
     
     private KahaDBStore store;
 
@@ -279,12 +279,12 @@
         this.cluster = cluster;
     }
 
-    public void setAsyncReplication(boolean asyncReplication) {
-        this.asyncReplication = asyncReplication;
+    public int getMinimumReplicas() {
+        return minimumReplicas;
     }
 
-    public boolean isAsyncReplication() {
-        return asyncReplication;
+    public void setMinimumReplicas(int minimumReplicas) {
+        this.minimumReplicas = minimumReplicas;
     }
 
 

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManager.java?rev=720234&r1=720233&r2=720234&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManager.java
(original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManager.java
Mon Nov 24 09:44:12 2008
@@ -48,7 +48,7 @@
 /**
  * 
  * @author chirino
- * @org.apache.xbean.XBean element="zookeeper-cluster"
+ * @org.apache.xbean.XBean element="zookeeperCluster"
  */
 public class ZooKeeperClusterStateManager implements ClusterStateManager, Watcher {
     private static final Log LOG = LogFactory.getLog(ZooKeeperClusterStateManager.class);

Modified: activemq/trunk/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java?rev=720234&r1=720233&r2=720234&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
(original)
+++ activemq/trunk/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
Mon Nov 24 09:44:12 2008
@@ -49,7 +49,7 @@
 		StaticClusterStateManager cluster = new StaticClusterStateManager();
 		
 		ReplicationService rs1 = new ReplicationService();
-		rs1.setAsyncReplication(true);
+		rs1.setMinimumReplicas(0);
 		rs1.setUri(BROKER1_REPLICATION_ID);
 		rs1.setCluster(cluster);
 		rs1.setDirectory(new File("target/replication-test/broker1"));
@@ -57,7 +57,7 @@
 		rs1.start();
 
         ReplicationService rs2 = new ReplicationService();
-        rs2.setAsyncReplication(true);
+        rs2.setMinimumReplicas(0);
         rs2.setUri(BROKER2_REPLICATION_ID);
         rs2.setCluster(cluster);
         rs2.setDirectory(new File("target/replication-test/broker2"));

Modified: activemq/trunk/kahadb/src/test/resources/broker1/ha.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/test/resources/broker1/ha.xml?rev=720234&r1=720233&r2=720234&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/test/resources/broker1/ha.xml (original)
+++ activemq/trunk/kahadb/src/test/resources/broker1/ha.xml Mon Nov 24 09:44:12 2008
@@ -27,21 +27,21 @@
 
   <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
 
-  <kahadb-replication-broker xmlns="http://activemq.apache.org/schema/kahadb">
+  <kahadbReplicationBroker xmlns="http://activemq.apache.org/schema/kahadb">
   	<replicationService>
-	  <kahadb-replication
+	  <kahadbReplication
     	directory="target/kaha-data/broker1" 
     	brokerURI="xbean:broker1/ha-broker.xml" 
     	uri="kdbr://localhost:6001"
-    	asyncReplication="true">
+    	minimumReplicas="0">
     	
     	<cluster>
-    		<zookeeper-cluster uri="zk://localhost:2181/activemq/ha-cluster/mygroup" userid="activemq"
password=""/>
+    		<zookeeperCluster uri="zk://localhost:2181/activemq/ha-cluster/mygroup" userid="activemq"
password=""/>
     	</cluster>
     	
-      </kahadb-replication>
+      </kahadbReplication>
   	</replicationService>
-  </kahadb-replication-broker>
+  </kahadbReplicationBroker>
   
 </beans>
 <!-- END SNIPPET: example -->

Modified: activemq/trunk/kahadb/src/test/resources/broker2/ha.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/test/resources/broker2/ha.xml?rev=720234&r1=720233&r2=720234&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/test/resources/broker2/ha.xml (original)
+++ activemq/trunk/kahadb/src/test/resources/broker2/ha.xml Mon Nov 24 09:44:12 2008
@@ -27,21 +27,21 @@
 
   <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
 
-  <kahadb-replication-broker xmlns="http://activemq.apache.org/schema/kahadb">
+  <kahadbReplicationBroker xmlns="http://activemq.apache.org/schema/kahadb">
   	<replicationService>
-	  <kahadb-replication
+	  <kahadbReplication
     	directory="target/kaha-data-broker2" 
     	brokerURI="xbean:broker2/ha-broker.xml" 
     	uri="kdbr://localhost:6002"
-    	asyncReplication="true">
+    	minimumReplicas="0">
     	
     	<cluster>
-    		<zookeeper-cluster uri="zk://localhost:2181/activemq/ha-cluster/mygroup" userid="activemq"
password=""/>
+    		<zookeeperCluster uri="zk://localhost:2181/activemq/ha-cluster/mygroup" userid="activemq"
password=""/>
     	</cluster>
     	
-      </kahadb-replication>
+      </kahadbReplication>
   	</replicationService>
-  </kahadb-replication-broker>
+  </kahadbReplicationBroker>
   
 </beans>
 <!-- END SNIPPET: example -->



Mime
View raw message