Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 23705 invoked from network); 18 Nov 2008 14:31:39 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 18 Nov 2008 14:31:39 -0000 Received: (qmail 13331 invoked by uid 500); 18 Nov 2008 14:31:45 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 13312 invoked by uid 500); 18 Nov 2008 14:31:45 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 13280 invoked by uid 99); 18 Nov 2008 14:31:44 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 18 Nov 2008 06:31:44 -0800 X-ASF-Spam-Status: No, hits=-1998.8 required=10.0 tests=ALL_TRUSTED,FS_REPLICA X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 18 Nov 2008 14:30:21 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 904A62388855; Tue, 18 Nov 2008 06:30:36 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r718615 - in /activemq/sandbox/kahadb: ./ src/main/java/org/apache/kahadb/replication/ src/main/java/org/apache/kahadb/store/ src/test/java/org/apache/kahadb/replication/ src/test/java/org/apache/kahadb/store/perf/ Date: Tue, 18 Nov 2008 14:30:36 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20081118143036.904A62388855@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Tue Nov 18 06:30:35 2008 New Revision: 718615 URL: http://svn.apache.org/viewvc?rev=718615&view=rev Log: The ReplicationServer now uses a BrokerFactory to create and destroy broker instances. This allows us to bring a master back online after it has been taken offline. Removed: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicatedBrokerService.java Modified: activemq/sandbox/kahadb/pom.xml activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/ReplicatedKahaStoreQueueTest.java Modified: activemq/sandbox/kahadb/pom.xml URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/pom.xml?rev=718615&r1=718614&r2=718615&view=diff ============================================================================== --- activemq/sandbox/kahadb/pom.xml (original) +++ activemq/sandbox/kahadb/pom.xml Tue Nov 18 06:30:35 2008 @@ -89,6 +89,12 @@ true + org.springframework + spring-context + 2.5.5 + true + + org.apache.hadoop.zookeeper zookeeper 3.0.0 Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java?rev=718615&r1=718614&r2=718615&view=diff ============================================================================== --- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java (original) +++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java Tue Nov 18 06:30:35 2008 @@ -69,7 +69,7 @@ public void start() throws Exception { synchronized (serverMutex) { - server = TransportFactory.bind(new URI(replicationServer.getNodeId())); + server = TransportFactory.bind(new URI(replicationServer.getUri())); server.setAcceptListener(new TransportAcceptListener() { public void onAccept(Transport transport) { try { Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java?rev=718615&r1=718614&r2=718615&view=diff ============================================================================== --- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java (original) +++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java Tue Nov 18 06:30:35 2008 @@ -23,6 +23,9 @@ import java.util.zip.Checksum; import org.apache.activemq.Service; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.util.IOHelper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.kahadb.page.PageFile; @@ -41,32 +44,46 @@ private KahaDBStore store; - private ReplicatedBrokerService brokerService; + private BrokerService brokerService; - public ReplicationServer() { - } - - public ReplicatedBrokerService getBrokerService() { - return brokerService; - } + private File directory = new File(IOHelper.getDefaultDataDirectory()); - public void setBrokerService(ReplicatedBrokerService brokerService) { - this.brokerService = brokerService; + public ReplicationServer() { } public KahaDBStore getStore() { + if( store == null ) { + store = new KahaDBStore(); + store.setDirectory(directory); + } return store; } - public void setStore(KahaDBStore store) { + public File getDirectory() { + return directory; + } + + public void setDirectory(File directory) { + this.directory = directory; + } + + public String getBrokerURI() { + return brokerURI; + } + + public void setBrokerURI(String brokerURI) { + this.brokerURI = brokerURI; + } + + public void setStore(KahaDBStore store) { this.store = store; } - public String getNodeId() { - return nodeId; + public String getUri() { + return uri; } - public void setNodeId(String nodeId) { - this.nodeId = nodeId; + public void setUri(String nodeId) { + this.uri = nodeId; } public ClusterStateManager getCluster() { @@ -78,7 +95,7 @@ } PageFile pageFile; - String nodeId; + String uri; ClusterStateManager cluster; ReplicationMaster master; @@ -88,9 +105,12 @@ private File tempReplicationDir; + private String brokerURI = "xbean:broker.xml"; + public void start() throws Exception { // The cluster will let us know about the cluster configuration, // which lets us decide if we are going to be a slave or a master. + getStore().open(); cluster.addListener(this); cluster.start(); } @@ -98,6 +118,7 @@ public void stop() throws Exception { cluster.removeListener(this); cluster.stop(); + getStore().close(); } public void onClusterChange(ClusterState clusterState) { @@ -110,10 +131,11 @@ LOG.info("Shutting down master due to cluster state change."); master.stop(); master = null; - // TODO: broker service does not support getting restarted once it's been stopped. :( - // so at this point we need, to re-create the broker if we want to go back into slave - // mode. - brokerService.stopMaster(); + brokerService.stop(); + brokerService=null; + // Stopping the broker service actually stops the store too.. + // so we need to open it back up. + getStore().open(); } // If the slave service was not yet started.. start it up. if (slave == null) { @@ -132,9 +154,10 @@ // If the master service was not yet started.. start it up. if (master == null) { LOG.info("Starting Master."); + brokerService = createBrokerService(); + brokerService.start(); master = new ReplicationMaster(this); master.start(); - brokerService.startMaster(); } master.onClusterChange(clusterState); @@ -159,16 +182,26 @@ } } - public ClusterState getClusterState() { + public BrokerService getBrokerService() { + return brokerService; + } + + private BrokerService createBrokerService() throws Exception { + BrokerService rc = BrokerFactory.createBroker(brokerURI ); + rc.setPersistenceAdapter(getStore()); + return rc; + } + + public ClusterState getClusterState() { return clusterState; } private boolean areWeTheSlave(ClusterState config) { - return config.getSlaves().contains(nodeId); + return config.getSlaves().contains(uri); } private boolean areWeTheMaster(ClusterState config) { - return nodeId.equals(config.getMaster()); + return uri.equals(config.getMaster()); } public File getReplicationFile(String fn) throws IOException { Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java?rev=718615&r1=718614&r2=718615&view=diff ============================================================================== --- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java (original) +++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java Tue Nov 18 06:30:35 2008 @@ -99,8 +99,6 @@ return; } - replicationServer.getStore().open(); - transport = TransportFactory.connect(new URI(master)); transport.setTransportListener(this); transport.start(); @@ -111,7 +109,7 @@ ReplicationFrame frame = new ReplicationFrame(); frame.setHeader(new PBHeader().setType(PBType.SLAVE_INIT)); PBSlaveInit payload = new PBSlaveInit(); - payload.setNodeId(replicationServer.getNodeId()); + payload.setNodeId(replicationServer.getUri()); // This call back is executed once the checkpoint is // completed and all data has been @@ -199,8 +197,6 @@ journalUpateFile=null; } journalUpdateFileId=0; - - replicationServer.getStore().close(); } } @@ -305,7 +301,6 @@ synchronized (transferMutex) { LOG.info("Slave synhcronization complete, going online..."); - replicationServer.getStore().close(); if( journalUpateFile!=null ) { @@ -332,7 +327,6 @@ online=true; replicationServer.getStore().open(); - LOG.info("Slave is now online. We are now eligible to become the master."); } Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java?rev=718615&r1=718614&r2=718615&view=diff ============================================================================== --- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java (original) +++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java Tue Nov 18 06:30:35 2008 @@ -301,15 +301,17 @@ public void unload() throws IOException, InterruptedException { synchronized (indexMutex) { - metadata.state = CLOSED_STATE; - metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation(); - - pageFile.tx().execute(new Transaction.Closure() { - public void execute(Transaction tx) throws IOException { - tx.store(metadata.page, metadataMarshaller, true); - } - }); - close(); + if( pageFile.isLoaded() ) { + metadata.state = CLOSED_STATE; + metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation(); + + pageFile.tx().execute(new Transaction.Closure() { + public void execute(Transaction tx) throws IOException { + tx.store(metadata.page, metadataMarshaller, true); + } + }); + close(); + } } } @@ -462,7 +464,11 @@ public JournalCommand load(Location location) throws IOException { ByteSequence data = journal.read(location); DataByteArrayInputStream is = new DataByteArrayInputStream(data); - KahaEntryType type = KahaEntryType.valueOf(is.readByte()); + byte readByte = is.readByte(); + KahaEntryType type = KahaEntryType.valueOf(readByte); + if( type == null ) { + throw new IOException("Could not load journal record. Invalid location: "+location); + } JournalCommand message = (JournalCommand)type.createMessage(); message.mergeFramed(is); return message; Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java?rev=718615&r1=718614&r2=718615&view=diff ============================================================================== --- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java (original) +++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java Tue Nov 18 06:30:35 2008 @@ -16,6 +16,7 @@ */ package org.apache.kahadb.replication; +import java.io.File; import java.util.Arrays; import javax.jms.Connection; @@ -47,21 +48,19 @@ // This cluster object will control who becomes the master. StaticClusterStateManager cluster = new StaticClusterStateManager(); - ReplicatedBrokerService b1 = new ReplicatedBrokerService(); - b1.addConnector(BROKER1_URI); - b1.setDataDirectory("target/replication-test/broker1"); - b1.setBrokerName("broker1"); - b1.getReplicationServer().setNodeId(BROKER1_REPLICATION_ID); - b1.getReplicationServer().setCluster(cluster); - b1.start(); - - ReplicatedBrokerService b2 = new ReplicatedBrokerService(); - b2.addConnector(BROKER2_URI); - b2.setDataDirectory("target/replication-test/broker2"); - b2.setBrokerName("broker2"); - b2.getReplicationServer().setNodeId(BROKER2_REPLICATION_ID); - b2.getReplicationServer().setCluster(cluster); - b2.start(); + ReplicationServer rs1 = new ReplicationServer(); + rs1.setUri(BROKER1_REPLICATION_ID); + rs1.setCluster(cluster); + rs1.setDirectory(new File("target/replication-test/broker1")); + rs1.setBrokerURI("broker://("+BROKER1_URI+")/broker1"); + rs1.start(); + + ReplicationServer rs2 = new ReplicationServer(); + rs2.setUri(BROKER2_REPLICATION_ID); + rs2.setCluster(cluster); + rs2.setDirectory(new File("target/replication-test/broker2")); + rs2.setBrokerURI("broker://(" + BROKER2_URI + ")/broker2"); + rs2.start(); // // None of the brokers should be accepting connections since they are not masters. // try { @@ -108,8 +107,8 @@ assertReceived(200, BROKER2_URI); - b2.stop(); - b1.stop(); + rs2.stop(); + rs1.stop(); } Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/ReplicatedKahaStoreQueueTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/ReplicatedKahaStoreQueueTest.java?rev=718615&r1=718614&r2=718615&view=diff ============================================================================== --- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/ReplicatedKahaStoreQueueTest.java (original) +++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/ReplicatedKahaStoreQueueTest.java Tue Nov 18 06:30:35 2008 @@ -16,12 +16,13 @@ */ package org.apache.kahadb.store.perf; +import java.io.File; import java.util.Arrays; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.perf.SimpleQueueTest; import org.apache.kahadb.replication.ClusterState; -import org.apache.kahadb.replication.ReplicatedBrokerService; +import org.apache.kahadb.replication.ReplicationServer; import org.apache.kahadb.replication.StaticClusterStateManager; /** @@ -30,13 +31,13 @@ public class ReplicatedKahaStoreQueueTest extends SimpleQueueTest { private StaticClusterStateManager cluster; - private ReplicatedBrokerService b1; - private ReplicatedBrokerService b2; private static final String BROKER1_REPLICATION_ID = "kdbr://localhost:60001"; private static final String BROKER2_REPLICATION_ID = "kdbr://localhost:60002"; protected String broker2BindAddress="tcp://localhost:61617"; + private ReplicationServer rs1; + private ReplicationServer rs2; @Override protected BrokerService createBroker(String uri) throws Exception { @@ -56,37 +57,33 @@ clusterState.setSlaves(Arrays.asList(slaves)); cluster.setClusterState(clusterState); - b1 = new ReplicatedBrokerService(); - b1.setDeleteAllMessagesOnStartup(true); - b1.addConnector(uri); - b1.setUseShutdownHook(false); - - b1.setDataDirectory("target/test-amq-data/perfTest-b1/amqdb"); - b1.setBrokerName("broker1"); - b1.getReplicationServer().setNodeId(BROKER1_REPLICATION_ID); - b1.getReplicationServer().setCluster(cluster); - b1.start(); - - Thread.sleep(1000); - - b2 = new ReplicatedBrokerService(); - b2.addConnector(broker2BindAddress); - b2.setDataDirectory("target/test-amq-data/perfTest-b2/amqdb"); - b2.setBrokerName("broker1"); - b2.getReplicationServer().setNodeId(BROKER2_REPLICATION_ID); - b2.getReplicationServer().setCluster(cluster); - b2.start(); + rs1 = new ReplicationServer(); + rs1.setUri(BROKER1_REPLICATION_ID); + rs1.setCluster(cluster); + rs1.setDirectory(new File("target/replication-test/broker1")); + rs1.setBrokerURI("broker://("+uri+")/broker1"); + rs1.start(); + + rs2 = new ReplicationServer(); + rs2.setUri(BROKER2_REPLICATION_ID); + rs2.setCluster(cluster); + rs2.setDirectory(new File("target/replication-test/broker2")); + rs2.setBrokerURI("broker://(" + broker2BindAddress + ")/broker2"); + rs2.start(); - - return b1; + return rs1.getBrokerService(); } @Override protected void tearDown() throws Exception { - if( b2!=null ) { - b2.stop(); - b2 = null; + if( rs1!=null ) { + rs1.stop(); + rs1 = null; } + if( rs2!=null ) { + rs2.stop(); + rs2 = null; + } } }