activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r712217 [1/2] - in /activemq/sandbox/kahadb: ./ src/main/java/org/apache/kahadb/ha/ src/main/java/org/apache/kahadb/ha/command/ src/main/java/org/apache/kahadb/journal/ src/main/java/org/apache/kahadb/page/ src/main/java/org/apache/kahadb/r...
Date Fri, 07 Nov 2008 18:16:45 GMT
Author: chirino
Date: Fri Nov  7 10:16:10 2008
New Revision: 712217

URL: http://svn.apache.org/viewvc?rev=712217&view=rev
Log:
Started impl of a replication protocol for the the KahaDB.

Added:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/ha/
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/ha/command/
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ClusterListener.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ClusterState.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ClusterStateManager.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicatedBrokerService.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationFrame.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/StaticClusterStateManager.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/transport/
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/transport/KDBRTransportFactory.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/transport/KDBRWireFormat.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/transport/KDBRWireFormatFactory.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBStore.java
    activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto
    activemq/sandbox/kahadb/src/main/resources/META-INF/
    activemq/sandbox/kahadb/src/main/resources/META-INF/services/
    activemq/sandbox/kahadb/src/main/resources/META-INF/services/org/
    activemq/sandbox/kahadb/src/main/resources/META-INF/services/org/apache/
    activemq/sandbox/kahadb/src/main/resources/META-INF/services/org/apache/activemq/
    activemq/sandbox/kahadb/src/main/resources/META-INF/services/org/apache/activemq/transport/
    activemq/sandbox/kahadb/src/main/resources/META-INF/services/org/apache/activemq/transport/kdbr
    activemq/sandbox/kahadb/src/main/resources/META-INF/services/org/apache/activemq/wireformat/
    activemq/sandbox/kahadb/src/main/resources/META-INF/services/org/apache/activemq/wireformat/kdbr
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/transport/
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/transport/KDBRTransportTest.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreBrokerTest.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreRecoveryBrokerTest.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreXARecoveryBrokerTest.java
Removed:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaStoreBrokerTest.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaStoreRecoveryBrokerTest.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaStoreXARecoveryBrokerTest.java
Modified:
    activemq/sandbox/kahadb/pom.xml
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaBulkLoadingTest.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreDurableTopicTest.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreQueueTest.java

Modified: activemq/sandbox/kahadb/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/pom.xml?rev=712217&r1=712216&r2=712217&view=diff
==============================================================================
--- activemq/sandbox/kahadb/pom.xml (original)
+++ activemq/sandbox/kahadb/pom.xml Fri Nov  7 10:16:10 2008
@@ -68,7 +68,7 @@
     <dependency>
       <groupId>org.apache.activemq</groupId>
       <artifactId>activemq-core</artifactId>
-      <version>5.2-SNAPSHOT</version>
+      <version>5.3-SNAPSHOT</version>
     </dependency>
     <dependency>
       <groupId>org.apache.xbean</groupId>
@@ -92,7 +92,7 @@
     <dependency>
       <groupId>org.apache.activemq</groupId>
       <artifactId>activemq-core</artifactId>
-      <version>5.2-SNAPSHOT</version>
+      <version>5.3-SNAPSHOT</version>
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java?rev=712217&r1=712216&r2=712217&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java Fri Nov  7 10:16:10 2008
@@ -41,7 +41,7 @@
         length = (int)(file.exists() ? file.length() : 0);
     }
     
-    File getFile() {
+    public File getFile() {
         return file;
     }
 

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java?rev=712217&r1=712216&r2=712217&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java Fri Nov  7 10:16:10 2008
@@ -32,6 +32,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -258,8 +259,7 @@
         if (dataFiles.isEmpty()|| ((dataFiles.getTail().getLength() + location.getSize()) > maxFileLength)) {
             int nextNum = !dataFiles.isEmpty() ? dataFiles.getTail().getDataFileId().intValue() + 1 : 1;
 
-            String fileName = filePrefix + nextNum;
-            File file = new File(directory, fileName);
+            File file = getFile(nextNum);
             DataFile nextWriteFile = new DataFile(file, nextNum, preferedFileLength);
             // actually allocate the disk space
             fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
@@ -275,6 +275,12 @@
         return currentWriteFile;
     }
 
+	public File getFile(int nextNum) {
+		String fileName = filePrefix + nextNum;
+		File file = new File(directory, fileName);
+		return file;
+	}
+
     synchronized DataFile getDataFile(Location item) throws IOException {
         Integer key = Integer.valueOf(item.getDataFileId());
         DataFile dataFile = fileMap.get(key);
@@ -632,6 +638,10 @@
         return fileByFileMap.keySet();
     }
 
+    public Map<Integer, DataFile> getFileMap() {
+        return new TreeMap<Integer, DataFile>(fileMap);
+    }
+    
     public long getDiskSize() {
         long tailLength=0;
         synchronized( this ) {

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=712217&r1=712216&r2=712217&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java Fri Nov  7 10:16:10 2008
@@ -1088,4 +1088,8 @@
         }
     }
 
+	public File getFile() {
+		return getMainPageFile();
+	}
+
 }

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ClusterListener.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ClusterListener.java?rev=712217&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ClusterListener.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ClusterListener.java Fri Nov  7 10:16:10 2008
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication;
+
+public interface ClusterListener {
+	
+	public void onClusterChange(ClusterState config);
+
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ClusterState.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ClusterState.java?rev=712217&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ClusterState.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ClusterState.java Fri Nov  7 10:16:10 2008
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ClusterState {
+
+	private List<String> slaves = new ArrayList<String>();
+	private String master;
+	
+	public List<String> getSlaves() {
+		return slaves;
+	}
+	public void setSlaves(List<String> slaves) {
+		this.slaves = slaves;
+	}
+	public String getMaster() {
+		return master;
+	}
+	public void setMaster(String master) {
+		this.master = master;
+	}
+	
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ClusterStateManager.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ClusterStateManager.java?rev=712217&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ClusterStateManager.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ClusterStateManager.java Fri Nov  7 10:16:10 2008
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication;
+
+import org.apache.activemq.Service;
+
+public interface ClusterStateManager extends Service {
+
+	void addListener(ClusterListener listener);
+	void removeListener(ClusterListener listener);
+
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicatedBrokerService.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicatedBrokerService.java?rev=712217&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicatedBrokerService.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicatedBrokerService.java Fri Nov  7 10:16:10 2008
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication;
+
+import java.io.IOException;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.kahadb.store.KahaDBStore;
+
+/**
+ *  
+ * @author chirino
+ */
+public class ReplicatedBrokerService extends BrokerService {
+
+	ReplicationServer replicationServer = new ReplicationServer();
+	
+	public ReplicationServer getReplicationServer() {
+		return replicationServer;
+	}
+
+	public void setReplicationServer(ReplicationServer replicationServer) {
+		this.replicationServer = replicationServer;
+	}
+
+	@Override
+	public void start() throws Exception {
+		replicationServer.setBrokerService(this);
+		replicationServer.setStore((KahaDBStore) getPersistenceAdapter());
+		replicationServer.start();
+	}
+	
+	@Override
+	public void stop() throws Exception {
+		replicationServer.stop();
+	}
+	
+	
+	public void startMaster() throws Exception {
+		super.start();
+	}
+
+	public void stopMaster() throws Exception {
+		super.stop();
+	}
+
+	@Override
+	protected PersistenceAdapter createPersistenceAdapter() throws IOException {
+        if (isPersistent()) {
+        	KahaDBStore rc = new KahaDBStore();
+        	rc.setBrokerName(getBrokerName());
+        	rc.setDirectory(this.getDataDirectoryFile());
+			return rc;
+        } else {
+        	throw new IOException("The Replicated Broker Service does not support disabling persistence");
+        }
+	}
+	
+	@Override
+	public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws IOException {
+		if( !(persistenceAdapter instanceof KahaDBStore) ) {
+			throw new IOException("The Replicated Broker Service only supports the KahaDBStore PersistenceAdapter");
+		}
+		super.setPersistenceAdapter(persistenceAdapter);
+	}
+	
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationFrame.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationFrame.java?rev=712217&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationFrame.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationFrame.java Fri Nov  7 10:16:10 2008
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication;
+
+import org.apache.kahadb.replication.pb.PBHeader;
+
+public class ReplicationFrame {
+	
+	PBHeader header;
+	Object payload;
+	
+	public PBHeader getHeader() {
+		return header;
+	}
+	public void setHeader(PBHeader header) {
+		this.header = header;
+	}
+	
+	public Object getPayload() {
+		return payload;
+	}
+	public void setPayload(Object payload) {
+		this.payload = payload;
+	}
+	
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java?rev=712217&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java Fri Nov  7 10:16:10 2008
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.zip.Adler32;
+import java.util.zip.Checksum;
+
+import org.apache.activemq.Service;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportAcceptListener;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.util.Callback;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.journal.DataFile;
+import org.apache.kahadb.journal.Location;
+import org.apache.kahadb.replication.pb.PBFileInfo;
+import org.apache.kahadb.replication.pb.PBHeader;
+import org.apache.kahadb.replication.pb.PBJournalLocation;
+import org.apache.kahadb.replication.pb.PBSlaveInit;
+import org.apache.kahadb.replication.pb.PBSlaveInitResponse;
+import org.apache.kahadb.replication.pb.PBType;
+import org.apache.kahadb.store.KahaDBStore;
+
+public class ReplicationMaster implements Service, ClusterListener {
+
+	private static final Log LOG = LogFactory.getLog(ReplicationServer.class);
+
+	private final ReplicationServer replicationServer;
+
+	private Object serverMutex = new Object() {
+	};
+	private TransportServer server;
+	private ArrayList<ReplicationSession> sessions = new ArrayList<ReplicationSession>();
+
+	public ReplicationMaster(ReplicationServer replication1Server) {
+		this.replicationServer = replication1Server;
+	}
+
+	public void start() throws Exception {
+		synchronized (serverMutex) {
+			server = TransportFactory.bind(new URI(replicationServer.getNodeId()));
+			server.setAcceptListener(new TransportAcceptListener() {
+				public void onAccept(Transport transport) {
+					try {
+						synchronized (serverMutex) {
+							ReplicationSession session = new ReplicationSession(transport);
+							session.start();
+							sessions.add(session);
+						}
+					} catch (Exception e) {
+						LOG.info("Could not accept replication connection from slave at " + transport.getRemoteAddress() + ", due to: " + e, e);
+					}
+				}
+
+				public void onAcceptError(Exception e) {
+					LOG.info("Could not accept replication connection: " + e, e);
+				}
+			});
+			server.start();
+		}
+	}
+
+	public void stop() throws Exception {
+		synchronized (serverMutex) {
+			if (server != null) {
+				server.stop();
+				server = null;
+			}
+		}
+	}
+
+	public void onClusterChange(ClusterState config) {
+		// TODO: if a slave is removed from the cluster, we should
+		// remove it's replication tracking info.
+	}
+
+	class ReplicationSession implements Service, TransportListener {
+
+		private final Transport transport;
+		private final AtomicBoolean subscribedToJournalUpdates = new AtomicBoolean();
+
+		public ReplicationSession(Transport transport) {
+			this.transport = transport;
+		}
+
+		public void start() throws Exception {
+			transport.setTransportListener(this);
+			transport.start();
+		}
+
+		public void stop() throws Exception {
+			transport.stop();
+		}
+
+		public void onCommand(Object command) {
+			try {
+				ReplicationFrame frame = (ReplicationFrame) command;
+				switch (frame.getHeader().getType()) {
+				case SLAVE_INIT:
+					subscribedToJournalUpdates.set(true);
+					onSlaveInit(frame, (PBSlaveInit) frame.getPayload());
+					break;
+				case FILE_TRANSFER:
+					onFileTransfer(frame, (PBFileInfo) frame.getPayload());
+					break;
+				case JOURNAL_UPDATE_ACK:
+					onJournalUpdateAck(frame, (PBJournalLocation) frame.getPayload());
+					break;
+				}
+			} catch (Exception e) {
+				failed(e);
+			}
+		}
+
+		public void onException(IOException error) {
+			failed(error);
+		}
+
+		public void failed(Exception error) {
+			try {
+				stop();
+			} catch (Exception ignore) {
+			}
+		}
+
+		public void transportInterupted() {
+		}
+		public void transportResumed() {
+		}
+		
+		private void onSlaveInit(ReplicationFrame frame, PBSlaveInit slaveInit) throws Exception {
+
+			// We could look at the slave state sent in the slaveInit and decide
+			// that a full sync is not needed..
+			// but for now we will do a full sync every time.
+			ReplicationFrame rc = new ReplicationFrame();
+			final PBSlaveInitResponse rcPayload = new PBSlaveInitResponse();
+			rc.setHeader(new PBHeader().setType(PBType.SLAVE_INIT_RESPONSE));
+			rc.setPayload(rcPayload);
+			
+			final KahaDBStore store = replicationServer.getStore();
+			store.checkpoint(new Callback() {
+				public void execute() throws Exception {
+					// This call back is executed once the checkpoint is
+					// completed and all data has been
+					// synced to disk, but while a lock is still held on the
+					// store so that no
+					// updates are allowed.
+
+					ArrayList<PBFileInfo> infos = new ArrayList<PBFileInfo>();
+					SnapshotStatus snapshot = createSnapshot();
+					PBFileInfo databaseInfo = new PBFileInfo();
+					databaseInfo.setName("database");
+					databaseInfo.setSnapshotId(snapshot.id);
+					databaseInfo.setStart(0);
+					databaseInfo.setEnd(snapshot.size);
+					databaseInfo.setChecksum(snapshot.checksum);
+					infos.add(databaseInfo);
+
+					Map<Integer, DataFile> journalFiles = store.getJournal().getFileMap();
+					for (DataFile df : journalFiles.values()) {
+						infos.add(createInfo("journal/" + df.getDataFileId(), df.getFile(), df.getLength()));
+					}
+					rcPayload.setCopyFilesList(infos);
+				}
+			});
+			
+			transport.oneway(rc);
+		}
+
+		private void onFileTransfer(ReplicationFrame frame, PBFileInfo fileInfo) throws IOException {
+			File file = replicationServer.getReplicationFile(fileInfo.getName());
+			long payloadSize = fileInfo.getEnd()-fileInfo.getStart();
+			
+			if( file.length() < fileInfo.getStart()+payloadSize ) {
+				throw new IOException("Requested replication file dose not have enough data.");
+			}		
+			
+			ReplicationFrame rc = new ReplicationFrame();
+			rc.setHeader(new PBHeader().setType(PBType.FILE_TRANSFER_RESPONSE).setPayloadSize(payloadSize));
+			
+			FileInputStream is = new FileInputStream(file);
+			rc.setPayload(is);
+			try {
+				is.skip(fileInfo.getStart());
+				transport.oneway(rc);
+			} finally {
+				try {
+					is.close();
+				} catch (Throwable e) {
+				}
+			}
+		}
+
+	}
+
+	static class SlaveStatus {
+		String salve_id;
+		PBJournalLocation lastAck;
+		Integer syncingSnapshot;
+	}
+
+	static class SnapshotStatus {
+		int id;
+		File file;
+		long checksum;
+		PBJournalLocation lastJournalLocation;
+		long size;
+	}
+
+
+
+	int nextSnapshotId;
+	SnapshotStatus currentSnapshot;
+	private SnapshotStatus createSnapshot() throws IOException {
+		if (currentSnapshot == null) {
+			currentSnapshot = new SnapshotStatus();
+			currentSnapshot.id = nextSnapshotId++;
+			KahaDBStore store = replicationServer.getStore();
+			File file = store.getPageFile().getFile();
+			currentSnapshot.file = new File(file.getParentFile(), "snapshot-" + currentSnapshot.id);
+			currentSnapshot.checksum = copyAndChecksum(file, currentSnapshot.file);
+			currentSnapshot.lastJournalLocation = convert(store.getJournal().getLastAppendLocation());
+			currentSnapshot.size = currentSnapshot.file.length();
+		}
+		return currentSnapshot;
+	}
+
+	private PBJournalLocation convert(Location loc) {
+		return new PBJournalLocation().setFileId(loc.getDataFileId()).setOffset(loc.getOffset());
+	}
+
+	private long copyAndChecksum(File input, File output) throws IOException {
+		FileInputStream is = null;
+		FileOutputStream os = null;
+		try {
+			is = new FileInputStream(input);
+			os = new FileOutputStream(output);
+
+			byte buffer[] = new byte[1024 * 4];
+			int c;
+
+			Checksum checksum = new Adler32();
+			while ((c = is.read(buffer)) >= 0) {
+				os.write(buffer, 0, c);
+				checksum.update(buffer, 0, c);
+			}
+			return checksum.getValue();
+
+		} finally {
+			try {
+				is.close();
+			} finally {
+			}
+			try {
+				os.close();
+			} finally {
+			}
+		}
+	}
+
+	private PBFileInfo createInfo(String name, File file, int length) throws IOException {
+		PBFileInfo rc = new PBFileInfo();
+		rc.setName(name);
+		FileInputStream is = new FileInputStream(file);
+		byte buffer[] = new byte[1024 * 4];
+		int c;
+
+		long size = 0;
+		Checksum checksum = new Adler32();
+		while (size < length && (c = is.read(buffer, 0, (int) Math.min(length - size, buffer.length))) >= 0) {
+			checksum.update(buffer, 0, c);
+			size += c;
+		}
+		rc.setChecksum(checksum.getValue());
+		rc.setStart(0);
+		rc.setEnd(size);
+		return rc;
+	}
+
+	private void onJournalUpdateAck(ReplicationFrame frame, PBJournalLocation journalLocation) {
+	}
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java?rev=712217&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java Fri Nov  7 10:16:10 2008
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.activemq.Service;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.page.PageFile;
+import org.apache.kahadb.store.KahaDBStore;
+
+/**
+ * Handles interfacing with the ClusterStateManager and handles activating the slave or master facets of
+ * the broker.
+ * 
+ * @author chirino
+ */
+public class ReplicationServer implements Service, ClusterListener {
+
+    private static final Log LOG = LogFactory.getLog(ReplicationServer.class);
+
+    private KahaDBStore store;
+
+	private ReplicatedBrokerService brokerService;
+
+	public ReplicationServer() {
+	}
+
+	public ReplicatedBrokerService getBrokerService() {
+		return brokerService;
+	}
+
+	public void setBrokerService(ReplicatedBrokerService brokerService) {
+		this.brokerService = brokerService;
+	}
+
+	public KahaDBStore getStore() {
+		return store;
+	}
+	public void setStore(KahaDBStore store) {
+		this.store = store;
+	}
+
+	public String getNodeId() {
+		return nodeId;
+	}
+
+	public void setNodeId(String nodeId) {
+		this.nodeId = nodeId;
+	}
+
+	public ClusterStateManager getCluster() {
+		return cluster;
+	}
+
+	public void setCluster(ClusterStateManager cluster) {
+		this.cluster = cluster;
+	}
+
+	PageFile pageFile;
+	String nodeId;
+	ClusterStateManager cluster;
+
+	ReplicationMaster master;
+	ReplicationSlave slave;
+
+	private ClusterState clusterState;
+
+	public void start() throws Exception {
+		// The cluster will let us know about the cluster configuration,
+		// which lets us decide if we are going to be a slave or a master.
+		cluster.addListener(this);
+		cluster.start();
+	}
+
+	public void stop() throws Exception {
+		cluster.removeListener(this);
+		cluster.stop();
+	}
+
+	public void onClusterChange(ClusterState clusterState) {
+		this.clusterState = clusterState;
+		try {
+			synchronized (cluster) {
+				if (areWeTheSlave(clusterState)) {
+					// If we were the master we need to stop the master service..
+					if (master != null) {
+						LOG.info("Shutting down master due to cluster state change.");
+						master.stop();
+						master = null;
+						// TODO: broker service does not support getting restarted once it's been stopped. :(
+						// so at this point we need, to re-create the broker if we want to go back into slave 
+						// mode.
+						brokerService.stopMaster();
+					}
+					// If the slave service was not yet started.. start it up.
+					if (slave == null) {
+						LOG.info("Starting replication slave.");
+						slave = new ReplicationSlave(this);
+						slave.start();
+					}
+					slave.onClusterChange(clusterState);
+				} else if (areWeTheMaster(clusterState)) {
+					// If we were the slave we need to stop the slave service..
+					if (slave != null) {
+						LOG.info("Switching from Slave to Master.");
+						slave.stop();
+						slave = null;
+					}
+					// If the master service was not yet started.. start it up.
+					if (master == null) {
+						LOG.info("Starting Master.");
+						master = new ReplicationMaster(this);
+						master.start();
+						brokerService.startMaster();
+					}
+					
+					master.onClusterChange(clusterState);					
+				} else {
+					// We were not part of the configuration (not master nor slave).
+					// So we have to shutdown any running master or slave services that may
+					// have been running.
+					if (master != null) {
+						LOG.info("Stoping master.. we were removed from the HA cluster.");
+						master.stop();
+						master = null;
+					}
+					if (slave != null) {
+						LOG.info("Stoping slave.. we were removed from the HA cluster.");
+						slave.stop();
+						slave = null;
+					}					
+				}
+			}
+		} catch (Exception e) {
+			LOG.warn("Unexpected Error: "+e, e);
+		}
+	}
+
+	public ClusterState getClusterState() {
+		return clusterState;
+	}
+
+	private boolean areWeTheSlave(ClusterState config) {
+		return config.getSlaves().contains(nodeId);
+	}
+	
+	private boolean areWeTheMaster(ClusterState config) {
+		return nodeId.equals(config.getMaster());
+	}
+
+	public File getReplicationFile(String fn) throws IOException {
+		if (fn.equals("database")) {
+			return getStore().getPageFile().getFile();
+		} if (fn.startsWith("journal/")) {
+			int id;
+			try {
+				id = Integer.parseInt(fn.substring("journal/".length()));
+			} catch (NumberFormatException e) {
+				throw new IOException("Unknown replication file name: "+fn);
+			}
+			return getStore().getJournal().getFile(id);
+		} else {
+			throw new IOException("Unknown replication file name: "+fn);
+		}
+	}
+
+	public boolean isMaster() {
+		return master!=null;
+	}
+
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java?rev=712217&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java Fri Nov  7 10:16:10 2008
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.Service;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.replication.pb.PBFileInfo;
+import org.apache.kahadb.replication.pb.PBHeader;
+import org.apache.kahadb.replication.pb.PBSlaveInit;
+import org.apache.kahadb.replication.pb.PBSlaveInitResponse;
+import org.apache.kahadb.replication.pb.PBType;
+
+public class ReplicationSlave implements Service, ClusterListener, TransportListener {
+	private static final Log LOG = LogFactory.getLog(ReplicationSlave.class);
+
+	private final ReplicationServer replicationServer;
+	private Transport transport;
+
+	public ReplicationSlave(ReplicationServer replicationServer) {
+		this.replicationServer = replicationServer;
+	}
+
+	public void start() throws Exception {
+		transport = TransportFactory.connect(new URI(replicationServer.getClusterState().getMaster()));
+		transport.setTransportListener(this);
+		transport.start();
+		
+		ReplicationFrame frame = new ReplicationFrame();
+		frame.setHeader(new PBHeader().setType(PBType.SLAVE_INIT));
+		PBSlaveInit payload = new PBSlaveInit();
+		payload.setNodeId(replicationServer.getNodeId());
+		frame.setPayload(payload);
+		LOG.info("Sending master slave init command: "+payload);
+		transport.oneway(frame);
+		
+	}
+
+	public void stop() throws Exception {
+	}
+
+	public void onClusterChange(ClusterState config) {
+	}
+
+	public void onCommand(Object command) {
+		try {
+			ReplicationFrame frame = (ReplicationFrame) command;
+			switch (frame.getHeader().getType()) {
+			case SLAVE_INIT_RESPONSE:
+				onSlaveInitResponse(frame, (PBSlaveInitResponse) frame.getPayload());
+				break;
+			}
+		} catch (Exception e) {
+			failed(e);
+		}
+	}
+
+	public void onException(IOException error) {
+		failed(error);
+	}
+
+	public void failed(Exception error) {
+		try {
+			error.printStackTrace();
+			stop();
+		} catch (Exception ignore) {
+		}
+	}
+
+	public void transportInterupted() {
+	}
+
+	public void transportResumed() {
+	}
+
+	private Object transferMutex = new Object();
+	private LinkedList<PBFileInfo> transferQueue = new LinkedList<PBFileInfo>();
+
+	private void onSlaveInitResponse(ReplicationFrame frame, PBSlaveInitResponse response) throws Exception {
+		LOG.info("Got init response: "+response);
+		delete(response.getDeleteFilesList());
+		synchronized(transferMutex) {
+			transferQueue.clear();
+			transferQueue.addAll(response.getCopyFilesList());
+		}
+		addTransferSession();
+	}
+
+		
+	private PBFileInfo dequeueTransferQueue() throws Exception {
+		synchronized( transferMutex ) {
+			if( transferQueue.isEmpty() ) {
+				return null;
+			}
+			return transferQueue.removeFirst();
+		}
+	}
+	
+	LinkedList<TransferSession> transferSessions = new LinkedList<TransferSession>();
+	
+	private void addTransferSession() throws Exception {
+		synchronized( transferMutex ) {
+			while( !transferQueue.isEmpty() && transferSessions.size()<5 ) {
+				TransferSession transferSession = new TransferSession();
+				transferSessions.add(transferSession);
+				try {
+					transferSession.start();
+				} catch (Exception e) {
+					transferSessions.remove(transferSession);
+				}
+			}
+		}
+	}
+	
+	class TransferSession implements Service, TransportListener {
+		
+		Transport transport;
+		private PBFileInfo info;
+		private File toFile;
+		private AtomicBoolean stopped = new AtomicBoolean();
+		private long transferStart;
+
+		public void start() throws Exception {
+			LOG.info("File transfer session started.");
+			transport = TransportFactory.connect(new URI(replicationServer.getClusterState().getMaster()));
+			transport.setTransportListener(this);
+			transport.start();
+			sendNextRequestOrStop();
+		}
+
+		private void sendNextRequestOrStop() {
+			try {
+				PBFileInfo info = dequeueTransferQueue();
+				if( info !=null ) {
+				
+					toFile = replicationServer.getReplicationFile(info.getName());
+					this.info = info;
+					
+					ReplicationFrame frame = new ReplicationFrame();
+					frame.setHeader(new PBHeader().setType(PBType.FILE_TRANSFER));
+					frame.setPayload(info);
+					
+					LOG.info("Requesting file: "+info.getName());
+					transferStart = System.currentTimeMillis();
+					
+					transport.oneway(frame);
+				} else {
+					stop();
+				}
+				
+			} catch ( Exception e ) {
+				failed(e);
+			}
+		}
+
+		public void stop() throws Exception {
+			if( stopped.compareAndSet(false, true) ) {
+				LOG.info("File transfer session stopped.");
+				synchronized( transferMutex ) {
+					if( info!=null ) {
+						transferQueue.addLast(info);
+					}
+					info = null;
+				}
+				Thread stopThread = new Thread("Transfer Session Shutdown: "+transport.getRemoteAddress()) {
+					@Override
+					public void run() {
+						try {
+							transport.stop();
+							synchronized( transferMutex ) {
+								transferSessions.remove(this);
+								addTransferSession();
+							}
+						} catch (Exception e) {
+							e.printStackTrace();
+						}
+					}
+				};
+				stopThread.setDaemon(true);
+				stopThread.start();
+			}
+		}
+
+		public void onCommand(Object command) {
+			try {
+				ReplicationFrame frame = (ReplicationFrame) command;
+				InputStream is = (InputStream) frame.getPayload();		
+				toFile.getParentFile().mkdirs();
+				FileOutputStream os = new FileOutputStream( toFile );
+				try {
+					copy(is, os, frame.getHeader().getPayloadSize());
+					long transferTime = System.currentTimeMillis()-this.transferStart;
+					float rate = frame.getHeader().getPayloadSize()*transferTime/1024000f;
+					LOG.info("File "+info.getName()+" transfered in "+transferTime+" (ms) at "+rate+" Kb/Sec");
+				} finally {
+					os.close();
+				}
+				this.info = null;
+				this.toFile = null;
+				
+				sendNextRequestOrStop();				
+			} catch (Exception e) {
+				failed(e);
+			}
+		}
+
+		public void onException(IOException error) {
+			failed(error);
+		}
+
+		public void failed(Exception error) {
+			try {
+				if( !stopped.get() ) {
+					LOG.warn("Replication session failure: "+transport.getRemoteAddress());
+				}
+				stop();
+			} catch (Exception ignore) {
+			}
+		}
+
+		public void transportInterupted() {
+		}
+		public void transportResumed() {
+		}
+
+	}
+	
+	private void copy(InputStream is, OutputStream os, long length) throws IOException {
+		byte buffer[] = new byte[1024 * 4];
+		int c=0;
+		long pos=0;
+		while ( pos <length && ((c = is.read(buffer, 0, (int)Math.min(buffer.length, length-pos))) >= 0) ) {
+			os.write(buffer, 0, c);
+			pos+=c;
+		}
+	}
+
+	private void delete(List<String> files) {
+		for (String fn : files) {
+			try {
+				replicationServer.getReplicationFile(fn).delete();
+			} catch (IOException e) {
+			}
+		}
+	}
+
+
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/StaticClusterStateManager.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/StaticClusterStateManager.java?rev=712217&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/StaticClusterStateManager.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/StaticClusterStateManager.java Fri Nov  7 10:16:10 2008
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication;
+
+import java.util.ArrayList;
+
+public class StaticClusterStateManager implements ClusterStateManager {
+
+	final private ArrayList<ClusterListener> listeners = new ArrayList<ClusterListener>();
+	private ClusterState clusterState;
+	private int startCounter;
+
+	synchronized public ClusterState getClusterState() {
+		return clusterState;
+	}
+
+	synchronized public void setClusterState(ClusterState clusterState) {
+		this.clusterState = clusterState;
+		fireClusterChange();
+	}
+
+	synchronized public void addListener(ClusterListener listener) {
+		listeners.add(listener);
+		fireClusterChange();
+	}
+
+	synchronized public void removeListener(ClusterListener listener) {
+		listeners.remove(listener);
+	}
+
+	synchronized public void start() throws Exception {
+		startCounter++;
+		fireClusterChange();
+	}
+
+	synchronized private void fireClusterChange() {
+		if( startCounter>0 && !listeners.isEmpty() && clusterState!=null ) {
+			for (ClusterListener listener : listeners) {
+				listener.onClusterChange(clusterState);
+			}
+		}
+	}
+
+	synchronized public void stop() throws Exception {
+		startCounter--;
+	}
+
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/transport/KDBRTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/transport/KDBRTransportFactory.java?rev=712217&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/transport/KDBRTransportFactory.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/transport/KDBRTransportFactory.java Fri Nov  7 10:16:10 2008
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication.transport;
+
+import java.util.Map;
+
+import org.apache.activemq.transport.MutexTransport;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.wireformat.WireFormat;
+
+/**
+ * A <a href="http://stomp.codehaus.org/">STOMP</a> transport factory
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public class KDBRTransportFactory extends TcpTransportFactory {
+
+    protected String getDefaultWireFormatType() {
+        return "kdbr";
+    }
+
+    public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
+        IntrospectionSupport.setProperties(transport, options);
+        return super.compositeConfigure(transport, format, options);
+    }
+
+    protected boolean isUseInactivityMonitor(Transport transport) {
+        return false;
+    }
+    
+    /**
+     * Override to remove the correlation transport filter since that relies on Command to 
+     * multiplex multiple requests and this protocol does not support that.
+     */
+    public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception {
+        transport = compositeConfigure(transport, wf, options);
+        transport = new MutexTransport(transport);
+
+        return transport;
+    }
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/transport/KDBRWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/transport/KDBRWireFormat.java?rev=712217&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/transport/KDBRWireFormat.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/transport/KDBRWireFormat.java Fri Nov  7 10:16:10 2008
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication.transport;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.activemq.protobuf.Message;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.kahadb.replication.ReplicationFrame;
+import org.apache.kahadb.replication.pb.PBFileInfo;
+import org.apache.kahadb.replication.pb.PBHeader;
+import org.apache.kahadb.replication.pb.PBJournalLocation;
+import org.apache.kahadb.replication.pb.PBJournalUpdate;
+import org.apache.kahadb.replication.pb.PBSlaveInit;
+import org.apache.kahadb.replication.pb.PBSlaveInitResponse;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+public class KDBRWireFormat implements WireFormat {
+
+	private int version;
+
+	public int getVersion() {
+		return version;
+	}
+
+	public void setVersion(int version) {
+		this.version = version;
+	}
+
+	public ByteSequence marshal(Object command) throws IOException {
+		throw new RuntimeException("Not implemented.");
+	}
+
+	public Object unmarshal(ByteSequence packet) throws IOException {
+		throw new RuntimeException("Not implemented.");
+	}
+
+	public void marshal(Object command, DataOutput out) throws IOException {
+		OutputStream os = (OutputStream) out;
+		ReplicationFrame frame = (ReplicationFrame) command;
+		PBHeader header = frame.getHeader();
+		switch (frame.getHeader().getType()) {
+		case FILE_TRANSFER_RESPONSE: {
+			// Write the header..
+			header.writeFramed(os);
+			// Stream the Payload.
+			InputStream is = (InputStream) frame.getPayload();
+			byte data[] = new byte[1024 * 4];
+			int c;
+			long remaining = frame.getHeader().getPayloadSize();
+			while (remaining > 0 && (c = is.read(data, 0, (int) Math.min(remaining, data.length))) >= 0) {
+				os.write(data, 0, c);
+				remaining -= c;
+			}
+			break;
+		}
+		default:
+			if (frame.getPayload() == null) {
+				header.clearPayloadSize();
+				header.writeFramed(os);
+			} else {
+				// All other payloads types are PB messages
+				Message message = (Message) frame.getPayload();
+				header.setPayloadSize(message.serializedSizeUnframed());
+				header.writeFramed(os);
+				message.writeUnframed(os);
+			}
+		}
+	}
+
+	public Object unmarshal(DataInput in) throws IOException {
+		InputStream is = (InputStream) in;
+		ReplicationFrame frame = new ReplicationFrame();
+		frame.setHeader(PBHeader.parseFramed(is));
+		switch (frame.getHeader().getType()) {
+		case FILE_TRANSFER_RESPONSE:
+			frame.setPayload(is);
+			break;
+		case FILE_TRANSFER:
+			readPBPayload(frame, in, new PBFileInfo());
+			break;
+		case JOURNAL_UPDATE:
+			readPBPayload(frame, in, new PBJournalUpdate());
+			break;
+		case JOURNAL_UPDATE_ACK:
+			readPBPayload(frame, in, new PBJournalLocation());
+			break;
+		case SLAVE_INIT:
+			readPBPayload(frame, in, new PBSlaveInit());
+			break;
+		case SLAVE_INIT_RESPONSE:
+			readPBPayload(frame, in, new PBSlaveInitResponse());
+			break;
+		}
+		return frame;
+	}
+
+	private void readPBPayload(ReplicationFrame frame, DataInput in, Message pb) throws IOException, InvalidProtocolBufferException {
+		long payloadSize = frame.getHeader().getPayloadSize();
+		byte[] payload;
+		payload = new byte[(int)payloadSize];
+		in.readFully(payload);
+		frame.setPayload(pb.mergeUnframed(payload));
+	}
+
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/transport/KDBRWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/transport/KDBRWireFormatFactory.java?rev=712217&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/transport/KDBRWireFormatFactory.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/transport/KDBRWireFormatFactory.java Fri Nov  7 10:16:10 2008
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication.transport;
+
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.wireformat.WireFormatFactory;
+
+/**
+ * @version $Revision$
+ */
+public class KDBRWireFormatFactory implements WireFormatFactory {
+
+	public WireFormat createWireFormat() {
+		return new KDBRWireFormat();
+	}
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBStore.java?rev=712217&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBStore.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBStore.java Fri Nov  7 10:16:10 2008
@@ -0,0 +1,584 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.store;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTempQueue;
+import org.apache.activemq.command.ActiveMQTempTopic;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.SubscriptionInfo;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.TransactionRecoveryListener;
+import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.kahadb.journal.Location;
+import org.apache.kahadb.page.Transaction;
+import org.apache.kahadb.store.data.KahaAddMessageCommand;
+import org.apache.kahadb.store.data.KahaCommitCommand;
+import org.apache.kahadb.store.data.KahaDestination;
+import org.apache.kahadb.store.data.KahaLocalTransactionId;
+import org.apache.kahadb.store.data.KahaLocation;
+import org.apache.kahadb.store.data.KahaPrepareCommand;
+import org.apache.kahadb.store.data.KahaRemoveDestinationCommand;
+import org.apache.kahadb.store.data.KahaRemoveMessageCommand;
+import org.apache.kahadb.store.data.KahaRollbackCommand;
+import org.apache.kahadb.store.data.KahaSubscriptionCommand;
+import org.apache.kahadb.store.data.KahaTransactionInfo;
+import org.apache.kahadb.store.data.KahaXATransactionId;
+import org.apache.kahadb.store.data.KahaDestination.DestinationType;
+
+import com.google.protobuf.ByteString;
+
+public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
+
+    private WireFormat wireFormat = new OpenWireFormat();
+
+    public void setBrokerName(String brokerName) {
+    }
+    public void setUsageManager(SystemUsage usageManager) {
+    }
+
+    public TransactionStore createTransactionStore() throws IOException {
+        return new TransactionStore(){
+            
+            public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
+                store(new KahaCommitCommand().setTransactionInfo(createTransactionInfo(txid)), true);
+            }
+            public void prepare(TransactionId txid) throws IOException {
+                store(new KahaPrepareCommand().setTransactionInfo(createTransactionInfo(txid)), true);
+            }
+            public void rollback(TransactionId txid) throws IOException {
+                store(new KahaRollbackCommand().setTransactionInfo(createTransactionInfo(txid)), false);
+            }
+            public void recover(TransactionRecoveryListener listener) throws IOException {
+                for (Map.Entry<TransactionId, ArrayList<Operation>> entry : preparedTransactions.entrySet()) {
+                    XATransactionId xid = (XATransactionId)entry.getKey();
+                    ArrayList<Message> messageList = new ArrayList<Message>();
+                    ArrayList<MessageAck> ackList = new ArrayList<MessageAck>();
+                    
+                    for (Operation op : entry.getValue()) {
+                        if( op.getClass() == AddOpperation.class ) {
+                            AddOpperation addOp = (AddOpperation)op;
+                            Message msg = (Message)wireFormat.unmarshal( new DataInputStream(addOp.getCommand().getMessage().newInput()) );
+                            messageList.add(msg);
+                        } else {
+                            RemoveOpperation rmOp = (RemoveOpperation)op;
+                            MessageAck ack = (MessageAck)wireFormat.unmarshal( new DataInputStream(rmOp.getCommand().getAck().newInput()) );
+                            ackList.add(ack);
+                        }
+                    }
+                    
+                    Message[] addedMessages = new Message[messageList.size()];
+                    MessageAck[] acks = new MessageAck[ackList.size()];
+                    messageList.toArray(addedMessages);
+                    ackList.toArray(acks);
+                    listener.recover(xid, addedMessages, acks);
+                }
+            }
+            public void start() throws Exception {
+            }
+            public void stop() throws Exception {
+            }
+        };
+    }
+
+    public class KahaDBMessageStore implements MessageStore {
+        private final ActiveMQDestination destination;
+        protected KahaDestination dest;
+
+        public KahaDBMessageStore(ActiveMQDestination destination) {
+            this.destination = destination;
+            this.dest = convert( destination );
+        }
+
+        public ActiveMQDestination getDestination() {
+            return destination;
+        }
+
+        public void addMessage(ConnectionContext context, Message message) throws IOException {
+            KahaAddMessageCommand command = new KahaAddMessageCommand();
+            command.setDestination(dest);
+            command.setMessageId(message.getMessageId().toString());
+            command.setTransactionInfo( createTransactionInfo(message.getTransactionId()) );
+
+            ByteSequence packet = wireFormat.marshal(message);
+            command.setMessage(ByteString.copyFrom(packet.getData(), packet.getOffset(), packet.getLength()));
+
+            store(command, isSyncWrites() && message.isResponseRequired());
+            
+        }
+        
+        public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
+            KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
+            command.setDestination(dest);
+            command.setMessageId(ack.getLastMessageId().toString());
+            command.setTransactionInfo(createTransactionInfo(ack.getTransactionId()) );
+            store(command, isSyncWrites() && ack.isResponseRequired());
+        }
+
+        public void removeAllMessages(ConnectionContext context) throws IOException {
+            KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
+            command.setDestination(dest);
+            store(command, true);
+        }
+
+        public Message getMessage(MessageId identity) throws IOException {
+            final String key = identity.toString();
+            
+            // Hopefully one day the page file supports concurrent read operations... but for now we must
+            // externally synchronize...
+            Location location;
+            synchronized(indexMutex) {
+                location = pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>(){
+                    public Location execute(Transaction tx) throws IOException {
+                        StoredDestination sd = getStoredDestination(dest, tx);
+                        Long sequence = sd.messageIdIndex.get(tx, key);
+                        if( sequence ==null ) {
+                            return null;
+                        }
+                        return sd.orderIndex.get(tx, sequence).location;
+                    }
+                });
+            }
+            if( location == null ) {
+                return null;
+            }
+            
+            return loadMessage(location);
+        }
+        
+        public int getMessageCount() throws IOException {
+            synchronized(indexMutex) {
+                return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){
+                    public Integer execute(Transaction tx) throws IOException {
+                        // Iterate through all index entries to get a count of messages in the destination.
+                        StoredDestination sd = getStoredDestination(dest, tx);
+                        int rc=0;
+                        for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) {
+                            iterator.next();
+                            rc++;
+                        }
+                        return rc;
+                    }
+                });
+            }
+        }
+
+        public void recover(final MessageRecoveryListener listener) throws Exception {
+            synchronized(indexMutex) {
+                pageFile.tx().execute(new Transaction.Closure<Exception>(){
+                    public void execute(Transaction tx) throws Exception {
+                        StoredDestination sd = getStoredDestination(dest, tx);
+                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
+                            Entry<Long, MessageKeys> entry = iterator.next();
+                            listener.recoverMessage( loadMessage(entry.getValue().location) );
+                        }
+                    }
+                });
+            }
+        }
+
+        long cursorPos=0;
+        
+        public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
+            synchronized(indexMutex) {
+                pageFile.tx().execute(new Transaction.Closure<Exception>(){
+                    public void execute(Transaction tx) throws Exception {
+                        StoredDestination sd = getStoredDestination(dest, tx);
+                        Entry<Long, MessageKeys> entry=null;
+                        int counter = 0;
+                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
+                            entry = iterator.next();
+                            listener.recoverMessage( loadMessage(entry.getValue().location ) );
+                            counter++;
+                            if( counter >= maxReturned ) {
+                                break;
+                            }
+                        }
+                        if( entry!=null ) {
+                            cursorPos = entry.getKey()+1;
+                        }
+                    }
+                });
+            }
+        }
+
+        public void resetBatching() {
+            cursorPos=0;
+        }
+
+        public void setMemoryUsage(MemoryUsage memoeyUSage) {
+        }
+        public void start() throws Exception {
+        }
+        public void stop() throws Exception {
+        }
+        
+    }
+        
+    class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
+        public KahaDBTopicMessageStore(ActiveMQTopic destination) {
+            super(destination);
+        }
+        
+        public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {
+            KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
+            command.setDestination(dest);
+            command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
+            command.setMessageId(messageId.toString());
+            // We are not passed a transaction info.. so we can't participate in a transaction.
+            // Looks like a design issue with the TopicMessageStore interface.  Also we can't recover the original ack
+            // to pass back to the XA recover method.
+            // command.setTransactionInfo();
+            store(command, false);
+        }
+
+        public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
+            String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo.getSubscriptionName());
+            KahaSubscriptionCommand command = new KahaSubscriptionCommand();
+            command.setDestination(dest);
+            command.setSubscriptionKey(subscriptionKey);
+            command.setRetroactive(retroactive);
+            ByteSequence packet = wireFormat.marshal(subscriptionInfo);
+            command.setSubscriptionInfo(ByteString.copyFrom(packet.getData(), packet.getOffset(), packet.getLength()));
+            store(command, isSyncWrites() && true);
+        }
+
+        public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
+            KahaSubscriptionCommand command = new KahaSubscriptionCommand();
+            command.setDestination(dest);
+            command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
+            store(command, isSyncWrites() && true);
+        }
+
+        public SubscriptionInfo[] getAllSubscriptions() throws IOException {
+            
+            final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>();
+            synchronized(indexMutex) {
+                pageFile.tx().execute(new Transaction.Closure<IOException>(){
+                    public void execute(Transaction tx) throws IOException {
+                        StoredDestination sd = getStoredDestination(dest, tx);
+                        for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator.hasNext();) {
+                            Entry<String, KahaSubscriptionCommand> entry = iterator.next();
+                            SubscriptionInfo info = (SubscriptionInfo)wireFormat.unmarshal( new DataInputStream(entry.getValue().getSubscriptionInfo().newInput()) );
+                            subscriptions.add(info);
+
+                        }
+                    }
+                });
+            }
+            
+            SubscriptionInfo[]rc=new SubscriptionInfo[subscriptions.size()];
+            subscriptions.toArray(rc);
+            return rc;
+        }
+
+        public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
+            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
+            synchronized(indexMutex) {
+                return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>(){
+                    public SubscriptionInfo execute(Transaction tx) throws IOException {
+                        StoredDestination sd = getStoredDestination(dest, tx);
+                        KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey);
+                        if( command ==null ) {
+                            return null;
+                        }
+                        return (SubscriptionInfo)wireFormat.unmarshal( new DataInputStream(command.getSubscriptionInfo().newInput()) );
+                    }
+                });
+            }
+        }
+       
+        public int getMessageCount(String clientId, String subscriptionName) throws IOException {
+            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
+            synchronized(indexMutex) {
+                return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){
+                    public Integer execute(Transaction tx) throws IOException {
+                        StoredDestination sd = getStoredDestination(dest, tx);
+                        Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
+                        if ( cursorPos==null ) {
+                            // The subscription might not exist.
+                            return 0;
+                        }
+                        cursorPos += 1;
+                        
+                        int counter = 0;
+                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
+                            iterator.next();
+                            counter++;
+                        }
+                        return counter;
+                    }
+                });
+            }        
+        }
+
+        public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
+            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
+            synchronized(indexMutex) {
+                pageFile.tx().execute(new Transaction.Closure<Exception>(){
+                    public void execute(Transaction tx) throws Exception {
+                        StoredDestination sd = getStoredDestination(dest, tx);
+                        Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
+                        cursorPos += 1;
+                        
+                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
+                            Entry<Long, MessageKeys> entry = iterator.next();
+                            listener.recoverMessage( loadMessage(entry.getValue().location ) );
+                        }
+                    }
+                });
+            }
+        }
+
+        public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, final MessageRecoveryListener listener) throws Exception {
+            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
+            synchronized(indexMutex) {
+                pageFile.tx().execute(new Transaction.Closure<Exception>(){
+                    public void execute(Transaction tx) throws Exception {
+                        StoredDestination sd = getStoredDestination(dest, tx);
+                        Long cursorPos = sd.subscriptionCursors.get(subscriptionKey);
+                        if( cursorPos == null ) {
+                            cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
+                            cursorPos += 1;
+                        }
+                        
+                        Entry<Long, MessageKeys> entry=null;
+                        int counter = 0;
+                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
+                            entry = iterator.next();
+                            listener.recoverMessage( loadMessage(entry.getValue().location ) );
+                            counter++;
+                            if( counter >= maxReturned ) {
+                                break;
+                            }
+                        }
+                        if( entry!=null ) {
+                            sd.subscriptionCursors.put(subscriptionKey, cursorPos+1);
+                        }
+                    }
+                });
+            }
+        }
+
+        public void resetBatching(String clientId, String subscriptionName) {
+            try {
+                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
+                synchronized(indexMutex) {
+                    pageFile.tx().execute(new Transaction.Closure<IOException>(){
+                        public void execute(Transaction tx) throws IOException {
+                            StoredDestination sd = getStoredDestination(dest, tx);
+                            sd.subscriptionCursors.remove(subscriptionKey);
+                        }
+                    });
+                }
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    String subscriptionKey(String clientId, String subscriptionName){
+        return clientId+":"+subscriptionName;
+    }
+    
+    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
+        return new KahaDBMessageStore(destination);
+    }
+
+    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
+        return new KahaDBTopicMessageStore(destination);
+    }
+    
+    public void deleteAllMessages() throws IOException {
+        deleteAllMessages=true;
+    }
+    
+    
+    public Set<ActiveMQDestination> getDestinations() {
+        try {
+            final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
+            synchronized(indexMutex) {
+                pageFile.tx().execute(new Transaction.Closure<IOException>(){
+                    public void execute(Transaction tx) throws IOException {
+                        for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) {
+                            Entry<String, StoredDestination> entry = iterator.next();
+                            rc.add(convert(entry.getKey()));
+                        }
+                    }
+                });
+            }
+            return rc;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+    
+    public long getLastMessageBrokerSequenceId() throws IOException {
+        return 0;
+    }
+    
+    public long size() {
+        if ( !started.get() ) {
+            return 0;
+        }
+        try {
+            return journal.getDiskSize() + pageFile.getDiskSize();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void beginTransaction(ConnectionContext context) throws IOException {
+        throw new IOException("Not yet implemented.");
+    }
+    public void commitTransaction(ConnectionContext context) throws IOException {
+        throw new IOException("Not yet implemented.");
+    }
+    public void rollbackTransaction(ConnectionContext context) throws IOException {
+        throw new IOException("Not yet implemented.");
+    }
+    
+    public void checkpoint(boolean sync) throws IOException {
+        super.checkpointCleanup(false);
+    }
+    
+    
+    ///////////////////////////////////////////////////////////////////
+    // Internal helper methods.
+    ///////////////////////////////////////////////////////////////////
+
+    /**
+     * @param location
+     * @return
+     * @throws IOException
+     */
+    Message loadMessage(Location location) throws IOException {
+        KahaAddMessageCommand addMessage = (KahaAddMessageCommand)load(location);
+        Message msg = (Message)wireFormat.unmarshal( new DataInputStream(addMessage.getMessage().newInput()) );
+        return msg;
+    }
+
+    ///////////////////////////////////////////////////////////////////
+    // Internal conversion methods.
+    ///////////////////////////////////////////////////////////////////
+    
+    KahaTransactionInfo createTransactionInfo(TransactionId txid) {
+        if( txid ==null ) {
+            return null;
+        }
+        KahaTransactionInfo rc = new KahaTransactionInfo();
+        
+        // Link it up to the previous record that was part of the transaction.
+        ArrayList<Operation> tx = inflightTransactions.get(txid);
+        if( tx!=null ) {
+            rc.setPreviousEntry(convert(tx.get(tx.size()-1).location));
+        }
+        
+        if( txid.isLocalTransaction() ) {
+            LocalTransactionId t = (LocalTransactionId)txid;
+            KahaLocalTransactionId kahaTxId = new KahaLocalTransactionId();
+            kahaTxId.setConnectionId(t.getConnectionId().getValue());
+            kahaTxId.setTransacitonId(t.getValue());
+            rc.setLocalTransacitonId(kahaTxId);
+        } else {
+            XATransactionId t = (XATransactionId)txid;
+            KahaXATransactionId kahaTxId = new KahaXATransactionId();
+            kahaTxId.setBranchQualifier(ByteString.copyFrom(t.getBranchQualifier()));
+            kahaTxId.setGlobalTransactionId(ByteString.copyFrom(t.getGlobalTransactionId()));
+            kahaTxId.setFormatId(t.getFormatId());
+            rc.setXaTransacitonId(kahaTxId);
+        }
+        return rc;
+    }
+    
+    KahaLocation convert(Location location) {
+        KahaLocation rc = new KahaLocation();
+        rc.setLogId(location.getDataFileId());
+        rc.setOffset(location.getOffset());
+        return rc;
+    }
+    
+    KahaDestination convert(ActiveMQDestination dest) {
+        KahaDestination rc = new KahaDestination();
+        rc.setName(dest.getPhysicalName());
+        switch( dest.getDestinationType() ) {
+        case ActiveMQDestination.QUEUE_TYPE:
+            rc.setType(DestinationType.QUEUE);
+            return rc;
+        case ActiveMQDestination.TOPIC_TYPE:
+            rc.setType(DestinationType.TOPIC);
+            return rc;
+        case ActiveMQDestination.TEMP_QUEUE_TYPE:
+            rc.setType(DestinationType.TEMP_QUEUE);
+            return rc;
+        case ActiveMQDestination.TEMP_TOPIC_TYPE:
+            rc.setType(DestinationType.TEMP_TOPIC);
+            return rc;
+        default:
+            return null;
+        }
+    }
+
+    ActiveMQDestination convert(String dest) {
+        int p = dest.indexOf(":");
+        if( p<0 ) {
+            throw new IllegalArgumentException("Not in the valid destination format");
+        }
+        int type = Integer.parseInt(dest.substring(0, p));
+        String name = dest.substring(p+1);
+        
+        switch( KahaDestination.DestinationType.valueOf(type) ) {
+        case QUEUE:
+            return new ActiveMQQueue(name);
+        case TOPIC:
+            return new ActiveMQTopic(name);
+        case TEMP_QUEUE:
+            return new ActiveMQTempQueue(name);
+        case TEMP_TOPIC:
+            return new ActiveMQTempTopic(name);
+        default:    
+            throw new IllegalArgumentException("Not in the valid destination format");
+        }
+    }
+    
+    
+}



Mime
View raw message