activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r718695 - in /activemq/sandbox/kahadb: ./ src/main/java/org/apache/kahadb/replication/ src/main/java/org/apache/kahadb/replication/zk/ src/main/java/org/apache/kahadb/store/ src/test/java/org/apache/kahadb/replication/ src/test/java/org/apa...
Date Tue, 18 Nov 2008 19:58:56 GMT
Author: chirino
Date: Tue Nov 18 11:58:56 2008
New Revision: 718695

URL: http://svn.apache.org/viewvc?rev=718695&view=rev
Log:
- added xbean/spring support.
- added test cases showing how to use xbean/spring to configure replicated servers. 

Added:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationBrokerService.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationService.java   (contents, props changed)
      - copied, changed from r718615, activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/StaticClusterStateManager.java   (contents, props changed)
      - copied, changed from r714049, activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/StaticClusterStateManager.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/XBeanReplicationTest.java
    activemq/sandbox/kahadb/src/test/resources/broker1/
    activemq/sandbox/kahadb/src/test/resources/broker1/ha-broker.xml
    activemq/sandbox/kahadb/src/test/resources/broker1/ha.xml
    activemq/sandbox/kahadb/src/test/resources/broker2/
    activemq/sandbox/kahadb/src/test/resources/broker2/ha-broker.xml
    activemq/sandbox/kahadb/src/test/resources/broker2/ha.xml
Removed:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/StaticClusterStateManager.java
Modified:
    activemq/sandbox/kahadb/pom.xml
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ClusterStateManager.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSupport.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManager.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManagerTest.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/ReplicatedKahaStoreQueueTest.java

Modified: activemq/sandbox/kahadb/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/pom.xml?rev=718695&r1=718694&r2=718695&view=diff
==============================================================================
--- activemq/sandbox/kahadb/pom.xml (original)
+++ activemq/sandbox/kahadb/pom.xml Tue Nov 18 11:58:56 2008
@@ -139,6 +139,21 @@
   <build>
     <plugins>
       <plugin>
+        <groupId>org.apache.xbean</groupId>
+        <artifactId>maven-xbean-plugin</artifactId>
+        <version>3.4</version>
+        <executions>
+          <execution>
+            <configuration>
+              <namespace>http://activemq.apache.org/schema/kahadb</namespace>
+            </configuration>
+            <goals>
+              <goal>mapping</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-compiler-plugin</artifactId>
         <configuration>

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ClusterStateManager.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ClusterStateManager.java?rev=718695&r1=718694&r2=718695&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ClusterStateManager.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ClusterStateManager.java Tue Nov 18 11:58:56 2008
@@ -17,10 +17,50 @@
 package org.apache.kahadb.replication;
 
 import org.apache.activemq.Service;
+import org.apache.kahadb.replication.pb.PBClusterNodeStatus;
 
+/**
+ * This interface is used by the ReplicationService to know when
+ * it should switch between Slave and Master mode. 
+ * 
+ * @author chirino
+ */
 public interface ClusterStateManager extends Service {
 
+    /**
+     * Adds a ClusterListener which is used to get notifications
+     * of chagnes in the cluster state.
+     * @param listener
+     */
 	void addListener(ClusterListener listener);
+	
+	/**
+	 * Removes a previously added ClusterListener
+	 * @param listener
+	 */
 	void removeListener(ClusterListener listener);
 
+	/**
+	 * Adds a member to the cluster.  Adding a member does not mean he is online.
+	 * Some ClusterStateManager may keep track of a persistent memebership list
+	 * so that can determine if there are enough nodes online to form a quorum
+	 * for the purposes of electing a master.
+	 * 
+	 * @param node
+	 */
+    public void addMember(final String node);
+    
+    /**
+     * Removes a previously added member.
+     * 
+     * @param node
+     */
+    public void removeMember(final String node);
+
+    /**
+     * Updates the status of the local node.
+     * 
+     * @param status
+     */
+    public void setMemberStatus(final PBClusterNodeStatus status);
 }

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationBrokerService.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationBrokerService.java?rev=718695&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationBrokerService.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationBrokerService.java Tue Nov 18 11:58:56 2008
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.broker.BrokerService;
+
+/**
+ * This broker service actually does not do anything.  It allows you to create an activemq.xml file
+ * which does not actually start a broker.  Used in conjunction with the ReplicationService since
+ * he will create the actual BrokerService
+ * 
+ * @author chirino
+ * @org.apache.xbean.XBean element="kahadb-replication-broker"
+ */
+public class ReplicationBrokerService extends BrokerService {
+
+    ReplicationService replicationService;
+    AtomicBoolean started = new AtomicBoolean();
+
+    public ReplicationService getReplicationService() {
+        return replicationService;
+    }
+
+    public void setReplicationService(ReplicationService replicationService) {
+        this.replicationService = replicationService;
+    }
+    
+    @Override
+    public void start() throws Exception {
+        if( started.compareAndSet(false, true) ) {
+            replicationService.start();
+        }
+    }
+    
+    @Override
+    public void stop() throws Exception {
+        if( started.compareAndSet(true, false) ) {
+            replicationService.stop();
+        }
+    }
+}

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java?rev=718695&r1=718694&r2=718695&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java Tue Nov 18 11:58:56 2008
@@ -26,6 +26,7 @@
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.Service;
 import org.apache.activemq.transport.Transport;
@@ -49,13 +50,11 @@
 import org.apache.kahadb.store.KahaDBStore;
 import org.apache.kahadb.util.ByteSequence;
 
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
-
 public class ReplicationMaster implements Service, ClusterListener, ReplicationTarget {
 
-	private static final Log LOG = LogFactory.getLog(ReplicationServer.class);
+	private static final Log LOG = LogFactory.getLog(ReplicationService.class);
 
-	private final ReplicationServer replicationServer;
+	private final ReplicationService replicationServer;
 
 	private Object serverMutex = new Object() {};
 	private TransportServer server;
@@ -63,7 +62,7 @@
 	
 	AtomicInteger nextSnapshotId = new AtomicInteger();
 
-	public ReplicationMaster(ReplicationServer replication1Server) {
+	public ReplicationMaster(ReplicationService replication1Server) {
 		this.replicationServer = replication1Server;
 	}
 
@@ -259,10 +258,10 @@
 						// small amounts of data.. so ignore small files.
 						if( slaveInfo!=null && slaveInfo.getEnd()> 1024*512 ) {
 							// If the slave's file checksum matches what we have..
-							if( replicationServer.checksum(df.getFile(), 0, slaveInfo.getEnd())==slaveInfo.getChecksum() ) {
+							if( ReplicationSupport.checksum(df.getFile(), 0, slaveInfo.getEnd())==slaveInfo.getChecksum() ) {
 								// is Our file longer? then we need to continue transferring the rest of the file.
 								if( df.getLength() > slaveInfo.getEnd() ) {
-									snapshotInfos.add(replicationServer.createInfo(name, df.getFile(), slaveInfo.getEnd(), df.getLength()));
+									snapshotInfos.add(ReplicationSupport.createInfo(name, df.getFile(), slaveInfo.getEnd(), df.getLength()));
 									journalReplicatedFiles.add(df.getDataFileId());
 									continue;
 								} else {
@@ -273,7 +272,7 @@
 						}
 						
 						// If we got here then it means we need to transfer the whole file.
-						snapshotInfos.add(replicationServer.createInfo(name, df.getFile(), 0, df.getLength()));
+						snapshotInfos.add(ReplicationSupport.createInfo(name, df.getFile(), 0, df.getLength()));
 						journalReplicatedFiles.add(df.getDataFileId());
 					}
 

Copied: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationService.java (from r718615, activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationService.java?p2=activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationService.java&p1=activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java&r1=718615&r2=718695&rev=718695&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationService.java Tue Nov 18 11:58:56 2008
@@ -18,9 +18,6 @@
 
 import java.io.File;
 import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.util.zip.Adler32;
-import java.util.zip.Checksum;
 
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.BrokerFactory;
@@ -28,253 +25,256 @@
 import org.apache.activemq.util.IOHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.kahadb.page.PageFile;
-import org.apache.kahadb.replication.pb.PBFileInfo;
+import org.apache.kahadb.replication.pb.PBClusterNodeStatus;
+import org.apache.kahadb.replication.pb.PBClusterNodeStatus.State;
 import org.apache.kahadb.store.KahaDBStore;
 
 /**
- * Handles interfacing with the ClusterStateManager and handles activating the slave or master facets of
- * the broker.
+ * Handles interfacing with the ClusterStateManager and handles activating the
+ * slave or master facets of the broker.
  * 
  * @author chirino
+ * @org.apache.xbean.XBean element="kahadb-replication"
  */
-public class ReplicationServer implements Service, ClusterListener {
+public class ReplicationService implements Service, ClusterListener {
 
-    private static final Log LOG = LogFactory.getLog(ReplicationServer.class);
+    private static final Log LOG = LogFactory.getLog(ReplicationService.class);
 
+    private String brokerURI = "xbean:broker.xml";
+    private File directory = new File(IOHelper.getDefaultDataDirectory());
+    private File tempReplicationDir;
+    private String uri;
+    private ClusterStateManager cluster;
+    
     private KahaDBStore store;
 
-	private BrokerService brokerService;
+    private ClusterState clusterState;
+    private BrokerService brokerService;
+    private ReplicationMaster master;
+    private ReplicationSlave slave;
+
+    public void start() throws Exception {
+        if( cluster==null ) {
+            throw new IllegalArgumentException("The cluster field has not been set.");
+        }
+        // The cluster will let us know about the cluster configuration,
+        // which lets us decide if we are going to be a slave or a master.
+        getStore().open();
+        cluster.addListener(this);
+        cluster.start();
+        
+        cluster.addMember(getUri());
+        cluster.setMemberStatus(createStatus(State.SLAVE_UNCONNECTED));
+    }
 
-    private File directory = new File(IOHelper.getDefaultDataDirectory());
+    public PBClusterNodeStatus createStatus(State state) throws IOException {
+        final PBClusterNodeStatus status = new PBClusterNodeStatus();
+        status.setConnectUri(getUri());
+        status.setLastUpdate(ReplicationSupport.convert(getStore().getLastUpdatePosition()));
+        status.setState(state);
+        return status;
+    }
 
-	public ReplicationServer() {
-	}
+    public void stop() throws Exception {
+        cluster.removeListener(this);
+        cluster.stop();
+        stopMaster();
+        stopSlave();
+        getStore().close();
+    }
 
-	public KahaDBStore getStore() {
-	    if( store == null ) {
-	        store = new KahaDBStore();
-	        store.setDirectory(directory);
-	    }
-		return store;
-	}
-	public File getDirectory() {
-        return directory;
+    public void onClusterChange(ClusterState clusterState) {
+        this.clusterState = clusterState;
+        try {
+            synchronized (cluster) {
+                if (areWeTheSlave(clusterState)) {
+                    // If we were the master we need to stop the master
+                    // service..
+                    stopMaster();
+                    // If the slave service was not yet started.. start it up.
+                    if( clusterState.getMaster()==null ) {
+                        stopSlave();
+                    } else {
+                        startSlave();
+                        slave.onClusterChange(clusterState);
+                    }
+                } else if (areWeTheMaster(clusterState)) {
+                    // If we were the slave we need to stop the slave service..
+                    stopSlave();
+                    // If the master service was not yet started.. start it up.
+                    startMaster();
+                    master.onClusterChange(clusterState);
+                } else {
+                    // We were not part of the configuration (not master nor
+                    // slave).
+                    // So we have to shutdown any running master or slave
+                    // services that may
+                    // have been running.
+                    stopMaster();
+                    stopSlave();
+                    getCluster().setMemberStatus(createStatus(State.SLAVE_UNCONNECTED));
+
+                }
+            }
+        } catch (Exception e) {
+            LOG.warn("Unexpected Error: " + e, e);
+        }
     }
 
-    public void setDirectory(File directory) {
-        this.directory = directory;
+    private void startMaster() throws IOException, Exception {
+        if (master == null) {
+            LOG.info("Starting replication master.");
+            getCluster().setMemberStatus(createStatus(State.MASTER));
+            brokerService = createBrokerService();
+            brokerService.start();
+            master = new ReplicationMaster(this);
+            master.start();
+        }
     }
 
-    public String getBrokerURI() {
-        return brokerURI;
+    private void stopSlave() throws Exception {
+        if (slave != null) {
+            LOG.info("Stopping replication slave.");
+            slave.stop();
+            slave = null;
+        }
     }
 
-    public void setBrokerURI(String brokerURI) {
-        this.brokerURI = brokerURI;
+    private void startSlave() throws Exception {
+        if (slave == null) {
+            LOG.info("Starting replication slave.");
+            slave = new ReplicationSlave(this);
+            slave.start();
+        }
     }
 
-    public void setStore(KahaDBStore store) {
-		this.store = store;
-	}
+    private void stopMaster() throws Exception, IOException {
+        if (master != null) {
+            LOG.info("Stopping replication master.");
+            master.stop();
+            master = null;
+            brokerService.stop();
+            brokerService = null;
+            // Stopping the broker service actually stops the store
+            // too..
+            // so we need to open it back up.
+            getStore().open();
+        }
+    }
 
-	public String getUri() {
-		return uri;
-	}
+    public BrokerService getBrokerService() {
+        return brokerService;
+    }
 
-	public void setUri(String nodeId) {
-		this.uri = nodeId;
-	}
+    private BrokerService createBrokerService() throws Exception {
+        BrokerService rc = BrokerFactory.createBroker(brokerURI);
+        rc.setPersistenceAdapter(getStore());
+        return rc;
+    }
 
-	public ClusterStateManager getCluster() {
-		return cluster;
-	}
+    public ClusterState getClusterState() {
+        return clusterState;
+    }
 
-	public void setCluster(ClusterStateManager cluster) {
-		this.cluster = cluster;
-	}
+    private boolean areWeTheSlave(ClusterState config) {
+        return config.getSlaves().contains(uri);
+    }
 
-	PageFile pageFile;
-	String uri;
-	ClusterStateManager cluster;
+    private boolean areWeTheMaster(ClusterState config) {
+        return uri.equals(config.getMaster());
+    }
+    
+    ///////////////////////////////////////////////////////////////////
+    // Accessors
+    ///////////////////////////////////////////////////////////////////
+
+    public File getReplicationFile(String fn) throws IOException {
+        if (fn.equals("database")) {
+            return getStore().getPageFile().getFile();
+        }
+        if (fn.startsWith("journal-")) {
+            int id;
+            try {
+                id = Integer.parseInt(fn.substring("journal-".length()));
+            } catch (NumberFormatException e) {
+                throw new IOException("Unknown replication file name: " + fn);
+            }
+            return getStore().getJournal().getFile(id);
+        } else {
+            throw new IOException("Unknown replication file name: " + fn);
+        }
+    }
 
-	ReplicationMaster master;
-	ReplicationSlave slave;
 
-	private ClusterState clusterState;
+    public File getTempReplicationFile(String fn, int snapshotId) throws IOException {
+        if (fn.equals("database")) {
+            return new File(getTempReplicationDir(), "database-" + snapshotId);
+        }
+        if (fn.startsWith("journal-")) {
+            int id;
+            try {
+                id = Integer.parseInt(fn.substring("journal-".length()));
+            } catch (NumberFormatException e) {
+                throw new IOException("Unknown replication file name: " + fn);
+            }
+            return new File(getTempReplicationDir(), fn);
+        } else {
+            throw new IOException("Unknown replication file name: " + fn);
+        }
+    }
 
-	private File tempReplicationDir;
+    public boolean isMaster() {
+        return master != null;
+    }
 
-    private String brokerURI = "xbean:broker.xml";
+    public File getTempReplicationDir() {
+        if (tempReplicationDir == null) {
+            tempReplicationDir = new File(getStore().getDirectory(), "replication");
+        }
+        return tempReplicationDir;
+    }
+    public void setTempReplicationDir(File tempReplicationDir) {
+        this.tempReplicationDir = tempReplicationDir;
+    }
 
-	public void start() throws Exception {
-		// The cluster will let us know about the cluster configuration,
-		// which lets us decide if we are going to be a slave or a master.
-        getStore().open();
-		cluster.addListener(this);
-		cluster.start();
-	}
-
-	public void stop() throws Exception {
-		cluster.removeListener(this);
-		cluster.stop();
-		getStore().close();
-	}
-
-	public void onClusterChange(ClusterState clusterState) {
-		this.clusterState = clusterState;
-		try {
-			synchronized (cluster) {
-				if (areWeTheSlave(clusterState)) {
-					// If we were the master we need to stop the master service..
-					if (master != null) {
-						LOG.info("Shutting down master due to cluster state change.");
-						master.stop();
-						master = null;
-						brokerService.stop();
-						brokerService=null;
-						// Stopping the broker service actually stops the store too..
-						// so we need to open it back up.
-						getStore().open();
-					}
-					// If the slave service was not yet started.. start it up.
-					if (slave == null) {
-						LOG.info("Starting replication slave.");
-						slave = new ReplicationSlave(this);
-						slave.start();
-					}
-					slave.onClusterChange(clusterState);
-				} else if (areWeTheMaster(clusterState)) {
-					// If we were the slave we need to stop the slave service..
-					if (slave != null) {
-						LOG.info("Switching from Slave to Master.");
-						slave.stop();
-						slave = null;
-					}
-					// If the master service was not yet started.. start it up.
-					if (master == null) {
-						LOG.info("Starting Master.");
-						brokerService = createBrokerService();
-                        brokerService.start();
-						master = new ReplicationMaster(this);
-						master.start();
-					}
-					
-					master.onClusterChange(clusterState);					
-				} else {
-					// We were not part of the configuration (not master nor slave).
-					// So we have to shutdown any running master or slave services that may
-					// have been running.
-					if (master != null) {
-						LOG.info("Stoping master.. we were removed from the HA cluster.");
-						master.stop();
-						master = null;
-					}
-					if (slave != null) {
-						LOG.info("Stoping slave.. we were removed from the HA cluster.");
-						slave.stop();
-						slave = null;
-					}					
-				}
-			}
-		} catch (Exception e) {
-			LOG.warn("Unexpected Error: "+e, e);
-		}
-	}
+    public KahaDBStore getStore() {
+        if (store == null) {
+            store = new KahaDBStore();
+            store.setDirectory(directory);
+        }
+        return store;
+    }
+    public void setStore(KahaDBStore store) {
+        this.store = store;
+    }
 
-	public BrokerService getBrokerService() {
-        return brokerService;
+    public File getDirectory() {
+        return directory;
+    }
+    public void setDirectory(File directory) {
+        this.directory = directory;
+    }
+
+    public String getBrokerURI() {
+        return brokerURI;
+    }
+    public void setBrokerURI(String brokerURI) {
+        this.brokerURI = brokerURI;
+    }
+
+    public String getUri() {
+        return uri;
+    }
+    public void setUri(String nodeId) {
+        this.uri = nodeId;
+    }
+    
+    public ClusterStateManager getCluster() {
+        return cluster;
+    }
+    public void setCluster(ClusterStateManager cluster) {
+        this.cluster = cluster;
     }
 
-    private BrokerService createBrokerService() throws Exception {
-	    BrokerService rc = BrokerFactory.createBroker(brokerURI );
-	    rc.setPersistenceAdapter(getStore());
-	    return rc;
-    }
-
-    public ClusterState getClusterState() {	    
-		return clusterState;
-	}
-
-	private boolean areWeTheSlave(ClusterState config) {
-		return config.getSlaves().contains(uri);
-	}
-	
-	private boolean areWeTheMaster(ClusterState config) {
-		return uri.equals(config.getMaster());
-	}
-
-	public File getReplicationFile(String fn) throws IOException {
-		if (fn.equals("database")) {
-			return getStore().getPageFile().getFile();
-		} if (fn.startsWith("journal-")) {
-			int id;
-			try {
-				id = Integer.parseInt(fn.substring("journal-".length()));
-			} catch (NumberFormatException e) {
-				throw new IOException("Unknown replication file name: "+fn);
-			}
-			return getStore().getJournal().getFile(id);
-		} else {
-			throw new IOException("Unknown replication file name: "+fn);
-		}
-	}
-
-	public File getTempReplicationDir() {
-		if( tempReplicationDir == null ) {
-			tempReplicationDir = new File( getStore().getDirectory(), "replication");
-		}
-		return tempReplicationDir;
-	}
-	
-	public File getTempReplicationFile(String fn, int snapshotId) throws IOException {
-		if (fn.equals("database")) {
-			return new File(getTempReplicationDir(), "database-"+snapshotId);
-		} if (fn.startsWith("journal-")) {
-			int id;
-			try {
-				id = Integer.parseInt(fn.substring("journal-".length()));
-			} catch (NumberFormatException e) {
-				throw new IOException("Unknown replication file name: "+fn);
-			}
-			return new File(getTempReplicationDir(), fn);
-		} else {
-			throw new IOException("Unknown replication file name: "+fn);
-		}
-	}
-	
-	PBFileInfo createInfo(String name, File file, long start, long length) throws IOException {
-		PBFileInfo rc = new PBFileInfo();
-		rc.setName(name);
-		rc.setChecksum(checksum(file, start, length));
-		rc.setStart(start);
-		rc.setEnd(length);
-		return rc;
-	}
-	
-	long checksum(File file, long start, long end) throws IOException {
-		RandomAccessFile raf = new RandomAccessFile(file, "r");
-		try {
-			Checksum checksum = new Adler32();
-			byte buffer[] = new byte[1024 * 4];
-			int c;
-			long pos = start;
-			raf.seek(start);
-			
-			while (pos < end && (c = raf.read(buffer, 0, (int) Math.min(end - pos, buffer.length))) >= 0) {
-				checksum.update(buffer, 0, c);
-				pos += c;
-			}
-			
-			return checksum.getValue();
-		} finally {
-			try { raf.close(); } catch (Throwable e){}
-		}
-	}
-
-	
-	public boolean isMaster() {
-		return master!=null;
-	}
 
 }

Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationService.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationService.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationService.java
------------------------------------------------------------------------------
    svn:mergeinfo = 

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java?rev=718695&r1=718694&r2=718695&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java Tue Nov 18 11:58:56 2008
@@ -48,6 +48,7 @@
 import org.apache.kahadb.replication.pb.PBSlaveInit;
 import org.apache.kahadb.replication.pb.PBSlaveInitResponse;
 import org.apache.kahadb.replication.pb.PBType;
+import org.apache.kahadb.replication.pb.PBClusterNodeStatus.State;
 import org.apache.kahadb.store.KahaDBStore;
 
 public class ReplicationSlave implements Service, ClusterListener, TransportListener {
@@ -56,7 +57,7 @@
 
 	private static final Log LOG = LogFactory.getLog(ReplicationSlave.class);
 
-	private final ReplicationServer replicationServer;
+	private final ReplicationService replicationServer;
 	private Transport transport;
 
 	// Used to bulk transfer the master state over to the slave..
@@ -73,14 +74,14 @@
 	RandomAccessFile journalUpateFile;
 	private String master;
 	
-	public ReplicationSlave(ReplicationServer replicationServer) {
+	public ReplicationSlave(ReplicationService replicationServer) {
 		this.replicationServer = replicationServer;
-		master = replicationServer.getClusterState().getMaster();
 	}
 
 	public void start() throws Exception {
 		if( started.compareAndSet(false, true)) {
-			doStart();
+	        onClusterChange(replicationServer.getClusterState());
+
 		}
 	}
 	
@@ -99,6 +100,8 @@
 				return;
 			}
 			
+			replicationServer.getCluster().setMemberStatus(replicationServer.createStatus(State.SLAVE_SYNCRONIZING));
+			
 			transport = TransportFactory.connect(new URI(master));
 			transport.setTransportListener(this);
 			transport.start();
@@ -133,11 +136,11 @@
 							continue;
 						}
 						
-						PBFileInfo info = replicationServer.createInfo("database", file, 0, file.length());
+						PBFileInfo info = ReplicationSupport.createInfo("database", file, 0, file.length());
 						info.setSnapshotId(snapshot);
 						infosMap.put("database", info);
 					} else if( name.startsWith("journal-") ) {
-						PBFileInfo info = replicationServer.createInfo(name, file, 0, file.length());
+						PBFileInfo info = ReplicationSupport.createInfo(name, file, 0, file.length());
 						infosMap.put(name, info);
 					}
 				}
@@ -152,12 +155,12 @@
 				if( infosMap.containsKey(name) ) {
 					continue;
 				}
-				infosMap.put(name, replicationServer.createInfo(name, df.getFile(), 0, df.getLength()));
+				infosMap.put(name, ReplicationSupport.createInfo(name, df.getFile(), 0, df.getLength()));
 			}
 			if( !infosMap.containsKey("database") ) {
 				File pageFile = store.getPageFile().getFile();
 				if( pageFile.exists() ) {
-					infosMap.put("database", replicationServer.createInfo("database", pageFile, 0, pageFile.length()));
+					infosMap.put("database", ReplicationSupport.createInfo("database", pageFile, 0, pageFile.length()));
 				}
 			}
 			
@@ -202,15 +205,14 @@
 
 	public void onClusterChange(ClusterState config) {
 		synchronized (transferMutex) {
-			// When the master changes.. we need to re-sync with the new master.
-			if( !master.equals(config.getMaster()) ) {
-				try {
-					doStop();
-					master = config.getMaster();
-					doStart();
-				} catch (Exception e) {
-					LOG.error("Could not restart syncing with new master: "+config.getMaster()+", due to: "+e,e);
-				}
+			try {
+	            if( master==null || !master.equals(config.getMaster()) ) {
+                    master = config.getMaster();
+		            doStop();
+				    doStart();
+	            }
+			} catch (Exception e) {
+				LOG.error("Could not restart syncing with new master: "+config.getMaster()+", due to: "+e,e);
 			}
 		}
 	}
@@ -327,9 +329,13 @@
 				online=true;
 				
 				replicationServer.getStore().open();
+				
+	            replicationServer.getCluster().setMemberStatus(replicationServer.createStatus(State.SLAVE_ONLINE));
 				LOG.info("Slave is now online.  We are now eligible to become the master.");
 			}
 			
+			
+			
 			// Let the master know we are now online.
 			ReplicationFrame frame = new ReplicationFrame();
 			frame.setHeader(new PBHeader().setType(PBType.SLAVE_ONLINE));

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSupport.java?rev=718695&r1=718694&r2=718695&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSupport.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSupport.java Tue Nov 18 11:58:56 2008
@@ -20,10 +20,12 @@
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.util.zip.Adler32;
 import java.util.zip.Checksum;
 
 import org.apache.kahadb.journal.Location;
+import org.apache.kahadb.replication.pb.PBFileInfo;
 import org.apache.kahadb.replication.pb.PBJournalLocation;
 
 public class ReplicationSupport {
@@ -72,6 +74,36 @@
         }
     }
 
+    public static PBFileInfo createInfo(String name, File file, long start, long length) throws IOException {
+        PBFileInfo rc = new PBFileInfo();
+        rc.setName(name);
+        rc.setChecksum(checksum(file, start, length));
+        rc.setStart(start);
+        rc.setEnd(length);
+        return rc;
+    }
+
+    public static long checksum(File file, long start, long end) throws IOException {
+        RandomAccessFile raf = new RandomAccessFile(file, "r");
+        try {
+            Checksum checksum = new Adler32();
+            byte buffer[] = new byte[1024 * 4];
+            int c;
+            long pos = start;
+            raf.seek(start);
+
+            while (pos < end && (c = raf.read(buffer, 0, (int) Math.min(end - pos, buffer.length))) >= 0) {
+                checksum.update(buffer, 0, c);
+                pos += c;
+            }
 
+            return checksum.getValue();
+        } finally {
+            try {
+                raf.close();
+            } catch (Throwable e) {
+            }
+        }
+    }
 
 }

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManager.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManager.java?rev=718695&r1=718694&r2=718695&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManager.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManager.java Tue Nov 18 11:58:56 2008
@@ -45,6 +45,11 @@
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.data.Stat;
 
+/**
+ * 
+ * @author chirino
+ * @org.apache.xbean.XBean element="zookeeper-cluster"
+ */
 public class ZooKeeperClusterStateManager implements ClusterStateManager, Watcher {
     private static final Log LOG = LogFactory.getLog(ZooKeeperClusterStateManager.class);
 
@@ -54,16 +59,14 @@
     private String uri = "zk://localhost:2181/activemq/ha-cluster/default";
     String userid = "activemq";
     String password = "";
-    
+
     private ZooKeeper zk;
     private String path;
 
-    ClusterState clusterState;
+    private ClusterState clusterState;
     private String statusPath;
     private PBClusterNodeStatus memberStatus;
-
     private Thread takoverTask;
-
     private boolean areWeTheBestMaster;
 
     synchronized public void addListener(ClusterListener listener) {
@@ -98,16 +101,42 @@
             // Create a ZooKeeper connection..
             zk = createZooKeeperConnection();
 
-            mkParentDirs(path);
-            try {
-                zk.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-            } catch (NodeExistsException ignore) {
+            while( isStarted() ) {
+                try {
+                    mkParentDirs(path);
+                    try {
+                        zk.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+                    } catch (NodeExistsException ignore) {
+                    }
+                    processClusterStateChange();
+                    return;
+                } catch (Exception e) {
+                    handleZKError(e);
+                }
             }
+        }
+    }
+    
+    synchronized private boolean isStarted() {
+        return startCounter > 0;
+    }
+
+    synchronized public void stop() throws Exception {
+        startCounter--;
+        if (startCounter == 0) {
+            zk.close();
+            zk = null;
+            path=null;
+            clusterState=null;
+            statusPath=null;
+            memberStatus=null;
+            takoverTask=null;
+            areWeTheBestMaster=false;
             
-            processClusterStateChange();
         }
     }
 
+
     public String getPath() {
         if( path == null ) {
             try {
@@ -170,33 +199,35 @@
         }
     }
 
-    synchronized public void stop() throws Exception {
-        startCounter--;
-        if (startCounter == 0) {
-            zk.close();
-            zk = null;
-        }
-    }
-
     public void process(WatchedEvent event) {
         System.out.println("Got: " + event);
     }
 
-    public void setMemberStatus(final PBClusterNodeStatus status) throws InvalidProtocolBufferException, KeeperException, InterruptedException {
-        this.memberStatus = status;
-        if (statusPath == null) {
-            mkdirs(path + "/election");
-            statusPath = zk.create(path + "/election/n_", status.toUnframedByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
-        } else {
-            Stat stat = zk.exists(statusPath, false);
-            if (status == null) {
-                zk.delete(statusPath, stat.getVersion());
-                statusPath = null;
-            } else {
-                zk.setData(statusPath, status.toUnframedByteArray(), stat.getVersion());
+    public void setMemberStatus(final PBClusterNodeStatus status) {
+        while( isStarted() ) {
+            try {
+                
+                this.memberStatus = status;
+                if (statusPath == null) {
+                    mkdirs(path + "/election");
+                    statusPath = zk.create(path + "/election/n_", status.toUnframedByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
+                } else {
+                    Stat stat = zk.exists(statusPath, false);
+                    if (status == null) {
+                        zk.delete(statusPath, stat.getVersion());
+                        statusPath = null;
+                    } else {
+                        zk.setData(statusPath, status.toUnframedByteArray(), stat.getVersion());
+                    }
+                }
+                processElectionChange();
+                return;
+                
+            } catch (Exception e) {
+                e.printStackTrace();
+                handleZKError(e);
             }
         }
-        processElectionChange();
     }
 
     synchronized private void processElectionChange() {
@@ -279,7 +310,12 @@
 
     protected void takoverAttempt() {
         try {
-            Thread.sleep(5 * 1000);
+            for( int i=0; i < 10; i++ ) {
+                Thread.sleep(500);
+                if( !isStarted() )
+                    return;
+            }
+            
             synchronized(this) {
                 try {
                     if( areWeTheBestMaster ) {
@@ -334,20 +370,48 @@
         return rc;
     }
 
-    public void addMember(final String node) throws InvalidProtocolBufferException, KeeperException, InterruptedException {
-        mkParentDirs(path);
-        update(path, CreateMode.PERSISTENT, new Updater<InvalidProtocolBufferException>() {
-            public byte[] update(byte[] data) throws InvalidProtocolBufferException {
-                PBClusterConfiguration config = new PBClusterConfiguration();
-                if (data != null) {
-                    config.mergeUnframed(data);
-                }
-                if (!config.getMembersList().contains(node)) {
-                    config.addMembers(node);
-                }
-                return config.toFramedByteArray();
+    public void addMember(final String node) {
+        while( isStarted() ) {
+            try {
+                mkParentDirs(path);
+                update(path, CreateMode.PERSISTENT, new Updater<InvalidProtocolBufferException>() {
+                    public byte[] update(byte[] data) throws InvalidProtocolBufferException {
+                        PBClusterConfiguration config = new PBClusterConfiguration();
+                        if (data != null) {
+                            config.mergeUnframed(data);
+                        }
+                        if (!config.getMembersList().contains(node)) {
+                            config.addMembers(node);
+                        }
+                        return config.toFramedByteArray();
+                    }
+                });
+                return;
+            } catch (Exception e) {
+                handleZKError(e);
             }
-        });
+        }
+    }
+
+    public void removeMember(final String node) {
+        while( isStarted() ) {
+            try {
+                mkParentDirs(path);
+                update(path, CreateMode.PERSISTENT, new Updater<InvalidProtocolBufferException>() {
+                    public byte[] update(byte[] data) throws InvalidProtocolBufferException {
+                        PBClusterConfiguration config = new PBClusterConfiguration();
+                        if (data != null) {
+                            config.mergeUnframed(data);
+                        }
+                        config.getMembersList().remove(node);
+                        return config.toFramedByteArray();
+                    }
+                });
+                return;
+            } catch (Exception e) {
+                handleZKError(e);
+            }
+        }
     }
 
     public void setMaster(final String node) throws InvalidProtocolBufferException, KeeperException, InterruptedException {
@@ -412,5 +476,45 @@
             }
         }
     }
+
     
+    public String getUri() {
+        return uri;
+    }
+
+    public void setUri(String uri) {
+        this.uri = uri;
+    }
+
+    public String getUserid() {
+        return userid;
+    }
+
+    public void setUserid(String userid) {
+        this.userid = userid;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+    
+    private void handleZKError(Exception e) {
+        LOG.warn("ZooKeeper error.  Will retry operation in 1 seconds");
+        LOG.debug("The error was: "+e, e);
+        
+        for( int i=0; i < 10; i ++) { 
+            try {
+                if( !isStarted() )
+                    return;
+                Thread.sleep(100);
+            } catch (InterruptedException e1) {
+                return;
+            }
+        }
+    }
+
 }

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java?rev=718695&r1=718694&r2=718695&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java Tue Nov 18 11:58:56 2008
@@ -382,7 +382,11 @@
             nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
         }
 	}
-
+	
+    public Location getLastUpdatePosition() throws IOException {
+        return metadata.lastUpdate;
+    }
+    
 	private Location getRecoveryPosition() throws IOException {
 		
         // If we need to recover the transactions..

Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java?rev=718695&r1=718694&r2=718695&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java Tue Nov 18 11:58:56 2008
@@ -48,14 +48,14 @@
 		// This cluster object will control who becomes the master.
 		StaticClusterStateManager cluster = new StaticClusterStateManager();
 		
-		ReplicationServer rs1 = new ReplicationServer();
+		ReplicationService rs1 = new ReplicationService();
 		rs1.setUri(BROKER1_REPLICATION_ID);
 		rs1.setCluster(cluster);
 		rs1.setDirectory(new File("target/replication-test/broker1"));
 		rs1.setBrokerURI("broker://("+BROKER1_URI+")/broker1");
 		rs1.start();
 
-        ReplicationServer rs2 = new ReplicationServer();
+        ReplicationService rs2 = new ReplicationService();
         rs2.setUri(BROKER2_REPLICATION_ID);
         rs2.setCluster(cluster);
         rs2.setDirectory(new File("target/replication-test/broker2"));

Copied: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/StaticClusterStateManager.java (from r714049, activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/StaticClusterStateManager.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/StaticClusterStateManager.java?p2=activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/StaticClusterStateManager.java&p1=activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/StaticClusterStateManager.java&r1=714049&r2=718695&rev=718695&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/StaticClusterStateManager.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/StaticClusterStateManager.java Tue Nov 18 11:58:56 2008
@@ -18,6 +18,8 @@
 
 import java.util.ArrayList;
 
+import org.apache.kahadb.replication.pb.PBClusterNodeStatus;
+
 public class StaticClusterStateManager implements ClusterStateManager {
 
 	final private ArrayList<ClusterListener> listeners = new ArrayList<ClusterListener>();
@@ -59,4 +61,13 @@
 		startCounter--;
 	}
 
+    public void addMember(String node) {
+    }
+
+    public void removeMember(String node) {
+    }
+
+    public void setMemberStatus(PBClusterNodeStatus status) {
+    }
+
 }

Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/StaticClusterStateManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/StaticClusterStateManager.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/StaticClusterStateManager.java
------------------------------------------------------------------------------
    svn:mergeinfo = 

Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/XBeanReplicationTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/XBeanReplicationTest.java?rev=718695&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/XBeanReplicationTest.java (added)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/XBeanReplicationTest.java Tue Nov 18 11:58:56 2008
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication;
+
+import java.io.File;
+import java.io.IOException;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.ServerStats;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.NIOServerCnxn.Factory;
+import org.apache.zookeeper.server.persistence.FileTxnLog;
+
+public class XBeanReplicationTest extends TestCase {
+
+	
+	private static final String BROKER1_URI = "tcp://localhost:61616";
+	private static final String BROKER2_URI = "tcp://localhost:61617";
+
+	private Destination destination = new ActiveMQQueue("TEST_QUEUE");
+	
+    private static final int PORT = 2181;
+    private Factory serverFactory;
+
+	public void testReplication() throws Exception {
+		
+        startZooKeeper();
+
+		BrokerService broker1 = BrokerFactory.createBroker("xbean:broker1/ha.xml");
+        broker1.start();
+        
+        // Wait for the broker to get setup..
+        Thread.sleep(7000);
+        
+        sendMesagesTo(BROKER1_URI, 100, "Pass 1: ");
+
+        // Create broker 2 which will join in and sync up with the existing master.
+        BrokerService broker2 = BrokerFactory.createBroker("xbean:broker2/ha.xml");
+        broker2.start();
+		
+        // Give it some time to sync up..
+        Thread.sleep(1000);
+        
+        // Stopping broker1 should make broker2 the master.
+        broker1.stop();
+
+        Thread.sleep(1000);
+        
+        // Did all the messages get synced up?
+		assertReceived(100, BROKER2_URI);
+		// Send some more message... 
+        sendMesagesTo(BROKER2_URI, 50, "Pass 2: ");
+		
+		// Start broker1 up again.. it should re-sync with master 2 
+		broker1.start();
+        // Give it some time to sync up..
+        Thread.sleep(1000);
+		
+		// stopping the master.. 
+		broker2.stop();
+		
+		// Did the new state get synced right?
+        assertReceived(50, BROKER1_URI);
+		
+        broker1.stop();
+        
+        stopZooKeeper();
+
+	}
+
+    private void stopZooKeeper() {
+        serverFactory.shutdown();
+        ServerStats.unregister();
+    }
+
+    private void startZooKeeper() throws IOException, InterruptedException {
+        ServerStats.registerAsConcrete();
+        File zooKeeperData = new File("target/test-data/zookeeper-"+System.currentTimeMillis());
+        zooKeeperData.mkdirs();
+
+        // Reduces startup time..
+        System.setProperty("zookeeper.preAllocSize", "100");
+        FileTxnLog.setPreallocSize(100);
+        ZooKeeperServer zs = new ZooKeeperServer(zooKeeperData, zooKeeperData, 3000);
+        
+        serverFactory = new NIOServerCnxn.Factory(PORT);
+        serverFactory.startup(zs);
+    }
+
+	private void assertReceived(int count, String brokerUri) throws JMSException {
+		ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUri);
+		Connection con = cf.createConnection();
+		con.start();
+		try {
+			Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+			MessageConsumer consumer = session.createConsumer(destination);
+			for (int i = 0; i < count; i++) {
+				TextMessage m = (TextMessage) consumer.receive(1000);
+				if( m==null ) {
+					fail("Failed to receive message: "+i);
+				}
+				System.out.println("Got: "+m.getText());
+			}
+		} finally {
+			try { con.close(); } catch (Throwable e) {}
+		}
+	}
+
+	private void sendMesagesTo(String brokerUri, int count, String msg) throws JMSException {
+		ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUri);
+		Connection con = cf.createConnection();
+		try {
+			Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+			MessageProducer producer = session.createProducer(destination);
+			for (int i = 0; i < count; i++) {
+				producer.send(session.createTextMessage(msg+i));
+			}
+		} finally {
+			try { con.close(); } catch (Throwable e) {}
+		}
+	}
+	
+}

Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManagerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManagerTest.java?rev=718695&r1=718694&r2=718695&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManagerTest.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManagerTest.java Tue Nov 18 11:58:56 2008
@@ -50,51 +50,10 @@
     private ZooKeeper zk;
     private Factory serverFactory;
 
-    public static boolean waitForServerUp(String host, int port, long timeout) {
-        long start = System.currentTimeMillis();
-        while (true) {
-            try {
-                Socket sock = new Socket(host, port);
-                BufferedReader reader = null;
-                try {
-                    OutputStream outstream = sock.getOutputStream();
-                    outstream.write("stat".getBytes());
-                    outstream.flush();
-
-                    reader =
-                        new BufferedReader(
-                                new InputStreamReader(sock.getInputStream()));
-                    String line = reader.readLine();
-                    if (line != null && line.startsWith("Zookeeper version:")) {
-                        return true;
-                    }
-                } finally {
-                    sock.close();
-                    if (reader != null) {
-                        reader.close();
-                    }
-                }
-            } catch (IOException e) {
-            }
-
-            if (System.currentTimeMillis() > start + timeout) {
-                break;
-            }
-            try {
-                Thread.sleep(250);
-            } catch (InterruptedException e) {
-                // ignore
-            }
-        }
-        return false;
-    }
-    
-
     @Override
     protected void setUp() throws Exception {
 
         ServerStats.registerAsConcrete();
-
         File tmpDir = new File("target/test-data/zookeeper");
         tmpDir.mkdirs();
 
@@ -107,8 +66,6 @@
         serverFactory = new NIOServerCnxn.Factory(PORT);
         serverFactory.startup(zs);
 
-//        assertTrue("waiting for server up", waitForServerUp("localhost", PORT, 1000*5));
-            
         zkcsm1 = new ZooKeeperClusterStateManager();
         zk = zkcsm1.createZooKeeperConnection();
         // Cleanup after previous run...

Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/ReplicatedKahaStoreQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/ReplicatedKahaStoreQueueTest.java?rev=718695&r1=718694&r2=718695&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/ReplicatedKahaStoreQueueTest.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/ReplicatedKahaStoreQueueTest.java Tue Nov 18 11:58:56 2008
@@ -22,7 +22,7 @@
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.perf.SimpleQueueTest;
 import org.apache.kahadb.replication.ClusterState;
-import org.apache.kahadb.replication.ReplicationServer;
+import org.apache.kahadb.replication.ReplicationService;
 import org.apache.kahadb.replication.StaticClusterStateManager;
 
 /**
@@ -36,8 +36,8 @@
 	private static final String BROKER2_REPLICATION_ID = "kdbr://localhost:60002";
 
     protected String broker2BindAddress="tcp://localhost:61617";
-    private ReplicationServer rs1;
-    private ReplicationServer rs2;
+    private ReplicationService rs1;
+    private ReplicationService rs2;
 
 	@Override
 	protected BrokerService createBroker(String uri) throws Exception {
@@ -57,14 +57,14 @@
 		clusterState.setSlaves(Arrays.asList(slaves));
 		cluster.setClusterState(clusterState);
 
-        rs1 = new ReplicationServer();
+        rs1 = new ReplicationService();
         rs1.setUri(BROKER1_REPLICATION_ID);
         rs1.setCluster(cluster);
         rs1.setDirectory(new File("target/replication-test/broker1"));
         rs1.setBrokerURI("broker://("+uri+")/broker1");
         rs1.start();
 
-        rs2 = new ReplicationServer();
+        rs2 = new ReplicationService();
         rs2.setUri(BROKER2_REPLICATION_ID);
         rs2.setCluster(cluster);
         rs2.setDirectory(new File("target/replication-test/broker2"));

Added: activemq/sandbox/kahadb/src/test/resources/broker1/ha-broker.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/resources/broker1/ha-broker.xml?rev=718695&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/test/resources/broker1/ha-broker.xml (added)
+++ activemq/sandbox/kahadb/src/test/resources/broker1/ha-broker.xml Tue Nov 18 11:58:56 2008
@@ -0,0 +1,36 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<!-- START SNIPPET: example -->
+<beans
+  xmlns="http://www.springframework.org/schema/beans"
+  xmlns:amq="http://activemq.apache.org/schema/core"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd   
+  http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd">
+
+    <broker xmlns="http://activemq.apache.org/schema/core" start="false" brokerName="localhost" dataDirectory="target/data" useJmx="false">
+
+        <!-- The transport connectors ActiveMQ will listen to -->
+        <transportConnectors>
+            <transportConnector name="openwire" uri="tcp://localhost:61616"/>
+        </transportConnectors>
+
+    </broker>
+
+</beans>
+<!-- END SNIPPET: example -->

Added: activemq/sandbox/kahadb/src/test/resources/broker1/ha.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/resources/broker1/ha.xml?rev=718695&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/test/resources/broker1/ha.xml (added)
+++ activemq/sandbox/kahadb/src/test/resources/broker1/ha.xml Tue Nov 18 11:58:56 2008
@@ -0,0 +1,46 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<!-- START SNIPPET: example -->
+<beans
+  xmlns="http://www.springframework.org/schema/beans"
+  xmlns:amq="http://activemq.apache.org/schema/core"
+  xmlns:kdb="http://activemq.apache.org/schema/kahadb"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activ	emq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd   
+  http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd
+  http://activemq.apache.org/schema/kahadb http://activemq.apache.org/schema/kahadb/kahadb.xsd">
+
+  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+  <kahadb-replication-broker xmlns="http://activemq.apache.org/schema/kahadb">
+  	<replicationService>
+	  <kahadb-replication
+    	directory="target/kaha-data/broker1" 
+    	brokerURI="xbean:broker1/ha-broker.xml" 
+    	uri="kdbr://localhost:6001">
+    	
+    	<cluster>
+    		<zookeeper-cluster uri="zk://localhost:2181/activemq/ha-cluster/mygroup" userid="activemq" password=""/>
+    	</cluster>
+    	
+      </kahadb-replication>
+  	</replicationService>
+  </kahadb-replication-broker>
+  
+</beans>
+<!-- END SNIPPET: example -->

Added: activemq/sandbox/kahadb/src/test/resources/broker2/ha-broker.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/resources/broker2/ha-broker.xml?rev=718695&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/test/resources/broker2/ha-broker.xml (added)
+++ activemq/sandbox/kahadb/src/test/resources/broker2/ha-broker.xml Tue Nov 18 11:58:56 2008
@@ -0,0 +1,36 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<!-- START SNIPPET: example -->
+<beans
+  xmlns="http://www.springframework.org/schema/beans"
+  xmlns:amq="http://activemq.apache.org/schema/core"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd   
+  http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd">
+
+    <broker xmlns="http://activemq.apache.org/schema/core" start="false" brokerName="localhost" dataDirectory="target/data" useJmx="false">
+
+        <!-- The transport connectors ActiveMQ will listen to -->
+        <transportConnectors>
+            <transportConnector name="openwire" uri="tcp://localhost:61617"/>
+        </transportConnectors>
+
+    </broker>
+
+</beans>
+<!-- END SNIPPET: example -->

Added: activemq/sandbox/kahadb/src/test/resources/broker2/ha.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/resources/broker2/ha.xml?rev=718695&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/test/resources/broker2/ha.xml (added)
+++ activemq/sandbox/kahadb/src/test/resources/broker2/ha.xml Tue Nov 18 11:58:56 2008
@@ -0,0 +1,46 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<!-- START SNIPPET: example -->
+<beans
+  xmlns="http://www.springframework.org/schema/beans"
+  xmlns:amq="http://activemq.apache.org/schema/core"
+  xmlns:kdb="http://activemq.apache.org/schema/kahadb"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activ	emq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd   
+  http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd
+  http://activemq.apache.org/schema/kahadb http://activemq.apache.org/schema/kahadb/kahadb.xsd">
+
+  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+  <kahadb-replication-broker xmlns="http://activemq.apache.org/schema/kahadb">
+  	<replicationService>
+	  <kahadb-replication
+    	directory="target/kaha-data-broker2" 
+    	brokerURI="xbean:broker2/ha-broker.xml" 
+    	uri="kdbr://localhost:6002">
+    	
+    	<cluster>
+    		<zookeeper-cluster uri="zk://localhost:2181/activemq/ha-cluster/mygroup" userid="activemq" password=""/>
+    	</cluster>
+    	
+      </kahadb-replication>
+  	</replicationService>
+  </kahadb-replication-broker>
+  
+</beans>
+<!-- END SNIPPET: example -->



Mime
View raw message