activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r713089 - in /activemq/sandbox/kahadb/src: main/java/org/apache/kahadb/journal/ main/java/org/apache/kahadb/replication/ test/java/org/apache/kahadb/store/perf/
Date Tue, 11 Nov 2008 17:28:21 GMT
Author: chirino
Date: Tue Nov 11 09:28:19 2008
New Revision: 713089

URL: http://svn.apache.org/viewvc?rev=713089&view=rev
Log:
- Added a perf test to see the impact of using KahaDB replication.
- Other small fixes.

Added:
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/ReplicatedKahaStoreQueueTest.java
Modified:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java?rev=713089&r1=713088&r2=713089&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java Tue Nov 11
09:28:19 2008
@@ -422,7 +422,23 @@
     }
 
 	public synchronized void appendedExternally(Location loc, int length) throws IOException
{
-		DataFile dataFile = getDataFile(loc);
+		DataFile dataFile = null;
+		if( dataFiles.getTail().getDataFileId() == loc.getDataFileId() ) {
+			// It's an update to the current log file..
+			dataFile = dataFiles.getTail();
+		} else if( dataFiles.getTail().getDataFileId()+1 == loc.getDataFileId() ) {
+			// It's an update to the next log file.
+            int nextNum = loc.getDataFileId();
+            File file = getFile(nextNum);
+            dataFile = new DataFile(file, nextNum, preferedFileLength);
+            // actually allocate the disk space
+            fileMap.put(dataFile.getDataFileId(), dataFile);
+            fileByFileMap.put(file, dataFile);
+            dataFiles.addLast(dataFile);
+		} else {
+			throw new IOException("Invalid external append.");
+		}
+
 		dataFile.incrementLength(length);
 	}
 
@@ -433,6 +449,9 @@
             if (cur == null) {
                 if (location == null) {
                     DataFile head = dataFiles.getHead();
+                    if( head == null ) {
+                    	return null;
+                    }
                     cur = new Location();
                     cur.setDataFileId(head.getDataFileId());
                     cur.setOffset(0);

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java?rev=713089&r1=713088&r2=713089&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
(original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
Tue Nov 11 09:28:19 2008
@@ -166,6 +166,7 @@
 					break;
 				}
 			} catch (Exception e) {
+				LOG.warn("Slave request failed: "+e, e);
 				failed(e);
 			}
 		}
@@ -286,6 +287,9 @@
 	}
 
 	private PBJournalLocation convert(Location loc) {
+		if( loc==null ) {
+			return null;
+		}
 		return new PBJournalLocation().setFileId(loc.getDataFileId()).setOffset(loc.getOffset());
 	}
 

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java?rev=713089&r1=713088&r2=713089&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
(original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
Tue Nov 11 09:28:19 2008
@@ -166,7 +166,7 @@
 
 	public void failed(Exception error) {
 		try {
-			error.printStackTrace();
+			LOG.warn("Replication session fail to master: "+transport.getRemoteAddress(), error);
 			stop();
 		} catch (Exception ignore) {
 		}

Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/ReplicatedKahaStoreQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/ReplicatedKahaStoreQueueTest.java?rev=713089&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/ReplicatedKahaStoreQueueTest.java
(added)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/ReplicatedKahaStoreQueueTest.java
Tue Nov 11 09:28:19 2008
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.store.perf;
+
+import java.util.Arrays;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.perf.SimpleQueueTest;
+import org.apache.kahadb.replication.ClusterState;
+import org.apache.kahadb.replication.ReplicatedBrokerService;
+import org.apache.kahadb.replication.StaticClusterStateManager;
+
+/**
+ * @version $Revision: 712224 $
+ */
+public class ReplicatedKahaStoreQueueTest extends SimpleQueueTest {
+
+	private StaticClusterStateManager cluster;
+	private ReplicatedBrokerService b1;
+	private ReplicatedBrokerService b2;
+	
+	private static final String BROKER1_REPLICATION_ID = "kdbr://localhost:60001";
+	private static final String BROKER2_REPLICATION_ID = "kdbr://localhost:60002";
+
+    protected String broker2BindAddress="tcp://localhost:61617";
+
+	@Override
+	protected BrokerService createBroker(String uri) throws Exception {
+		
+	    clientURI="failover:(" +
+	    		"tcp://localhost:61616?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=true&wireFormat.maxInactivityDuration=50000"
+
+	    		"," +
+	    		"tcp://localhost:61617?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=true&wireFormat.maxInactivityDuration=50000"
+
+	    		")?jms.useAsyncSend=true";
+	    
+		// This cluster object will control who becomes the master.
+		cluster = new StaticClusterStateManager();
+
+		ClusterState clusterState = new ClusterState();
+		clusterState.setMaster(BROKER1_REPLICATION_ID);
+		String[] slaves = {BROKER2_REPLICATION_ID};
+		clusterState.setSlaves(Arrays.asList(slaves));
+		cluster.setClusterState(clusterState);
+
+		b1 = new ReplicatedBrokerService();
+        b1.setDeleteAllMessagesOnStartup(true);
+        b1.addConnector(uri);
+        b1.setUseShutdownHook(false);
+
+		b1.setDataDirectory("target/test-amq-data/perfTest-b1/amqdb");
+		b1.setBrokerName("broker1");
+		b1.getReplicationServer().setNodeId(BROKER1_REPLICATION_ID);
+		b1.getReplicationServer().setCluster(cluster);
+		b1.start();
+		
+		Thread.sleep(1000);
+		
+		b2 = new ReplicatedBrokerService();
+		b2.addConnector(broker2BindAddress);
+		b2.setDataDirectory("target/test-amq-data/perfTest-b2/amqdb");
+		b2.setBrokerName("broker1");
+		b2.getReplicationServer().setNodeId(BROKER2_REPLICATION_ID);
+		b2.getReplicationServer().setCluster(cluster);
+		b2.start();
+
+		
+		return b1;
+	}
+	
+	@Override
+	protected void tearDown() throws Exception {
+		if( b2!=null ) {
+			b2.stop();
+			b2 = null;
+		}
+	}
+	
+}
+



Mime
View raw message