activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r712827 - in /activemq/sandbox/kahadb/src: main/java/org/apache/kahadb/ha/command/ main/java/org/apache/kahadb/journal/ main/java/org/apache/kahadb/page/ main/java/org/apache/kahadb/replication/ main/java/org/apache/kahadb/store/ test/java/...
Date Mon, 10 Nov 2008 20:41:23 GMT
Author: chirino
Date: Mon Nov 10 12:41:23 2008
New Revision: 712827

URL: http://svn.apache.org/viewvc?rev=712827&view=rev
Log:
More progress on replication.. bulk file sync and real time sync now seems to be working.
 Need to work on handling all the failure conditions now.

Added:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReplicationTarget.java
Removed:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/ha/command/
Modified:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBStore.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java?rev=712827&r1=712826&r2=712827&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
(original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
Mon Nov 10 12:41:23 2008
@@ -347,7 +347,8 @@
 
                 // 
                 // is it just 1 big write?
-                if (wb.size == write.location.getSize()) {
+                ReplicationTarget replicationTarget = dataManager.getReplicationTarget();
+                if (wb.size == write.location.getSize() && replicationTarget==null)
{
                     forceToDisk = write.sync | write.onComplete != null;
 
                     // Just write it directly..
@@ -360,7 +361,7 @@
 
                 } else {
 
-                    // Combine the smaller writes into 1 big buffer
+                    // We are going to do 1 big write.
                     while (write != null) {
                         forceToDisk |= write.sync | write.onComplete != null;
 
@@ -377,6 +378,11 @@
                     // Now do the 1 big write.
                     ByteSequence sequence = buff.toByteSequence();
                     file.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
+                    
+                    if( replicationTarget!=null ) {
+                    	replicationTarget.replicate(wb.writes.getHead().location, sequence,
forceToDisk);
+                    }
+                    
                     buff.reset();
                 }
 

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java?rev=712827&r1=712826&r2=712827&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java Mon Nov 10
12:41:23 2008
@@ -93,7 +93,7 @@
     protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - PREFERED_DIFF;
 
     protected DataFileAppender appender;
-    protected DataFileAccessorPool accessorPool = new DataFileAccessorPool(this);
+    protected DataFileAccessorPool accessorPool;
 
     protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
     protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>();
@@ -104,6 +104,7 @@
     protected Runnable cleanupTask;
     protected final AtomicLong totalLength = new AtomicLong();
     protected boolean archiveDataLogs;
+	private ReplicationTarget replicationTarget;
 
     @SuppressWarnings("unchecked")
     public synchronized void start() throws IOException {
@@ -111,6 +112,7 @@
             return;
         }
 
+        accessorPool = new DataFileAccessorPool(this);
         started = true;
         preferedFileLength = Math.max(PREFERED_DIFF, getMaxFileLength() - PREFERED_DIFF);
         lock();
@@ -176,8 +178,11 @@
             }
         }
 
-        storeState(false);
-
+        ByteSequence storedState = storeState(true);
+        if( dataFiles.isEmpty() ) {
+          appender.storeItem(storedState, Location.MARK_TYPE, true);
+        }
+        
         cleanupTask = new Runnable() {
             public void run() {
                 cleanup();
@@ -317,6 +322,9 @@
         fileByFileMap.clear();
         controlFile.unlock();
         controlFile.dispose();
+        controlFile=null;
+        dataFiles.clear();
+        lastAppendLocation.set(null);
         started = false;
     }
 
@@ -413,6 +421,11 @@
         return mark;
     }
 
+	public synchronized void appendedExternally(Location loc, int length) throws IOException
{
+		DataFile dataFile = getDataFile(loc);
+		dataFile.incrementLength(length);
+	}
+
     public synchronized Location getNextLocation(Location location) throws IOException, IllegalStateException
{
 
         Location cur = null;
@@ -541,10 +554,10 @@
         storeState(sync);
     }
 
-    protected synchronized void storeState(boolean sync) throws IOException {
+    protected synchronized ByteSequence storeState(boolean sync) throws IOException {
         ByteSequence state = marshallState();
-        appender.storeItem(state, Location.MARK_TYPE, sync);
         controlFile.store(state, sync);
+        return state;
     }
 
     public synchronized Location write(ByteSequence data, boolean sync) throws IOException,
IllegalStateException {
@@ -660,4 +673,12 @@
         return rc;
     }
 
+	public void setReplicationTarget(ReplicationTarget replicationTarget) {
+		this.replicationTarget = replicationTarget;
+	}
+	public ReplicationTarget getReplicationTarget() {
+		return replicationTarget;
+	}
+
+
 }

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReplicationTarget.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReplicationTarget.java?rev=712827&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReplicationTarget.java
(added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReplicationTarget.java
Mon Nov 10 12:41:23 2008
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.journal;
+
+import org.apache.kahadb.util.ByteSequence;
+
+public interface ReplicationTarget {
+
+	void replicate(Location location, ByteSequence sequence, boolean sync);
+
+}

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=712827&r1=712826&r2=712827&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java Mon Nov 10
12:41:23 2008
@@ -457,11 +457,11 @@
         return new File(directory, IOHelper.toFileSystemSafeName(name)+".dat");
     }
     
-    private File getFreeFile() {
+    public File getFreeFile() {
         return new File(directory, IOHelper.toFileSystemSafeName(name)+".fre");
     } 
 
-    private File getRecoveryFile() {
+    public File getRecoveryFile() {
         return new File(directory, IOHelper.toFileSystemSafeName(name)+".rec");
     } 
 

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java?rev=712827&r1=712826&r2=712827&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
(original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
Mon Nov 10 12:41:23 2008
@@ -23,6 +23,7 @@
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.zip.Adler32;
 import java.util.zip.Checksum;
@@ -38,15 +39,20 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.kahadb.journal.DataFile;
 import org.apache.kahadb.journal.Location;
+import org.apache.kahadb.journal.ReplicationTarget;
 import org.apache.kahadb.replication.pb.PBFileInfo;
 import org.apache.kahadb.replication.pb.PBHeader;
 import org.apache.kahadb.replication.pb.PBJournalLocation;
+import org.apache.kahadb.replication.pb.PBJournalUpdate;
 import org.apache.kahadb.replication.pb.PBSlaveInit;
 import org.apache.kahadb.replication.pb.PBSlaveInitResponse;
 import org.apache.kahadb.replication.pb.PBType;
 import org.apache.kahadb.store.KahaDBStore;
+import org.apache.kahadb.util.ByteSequence;
 
-public class ReplicationMaster implements Service, ClusterListener {
+import com.google.protobuf.ByteString;
+
+public class ReplicationMaster implements Service, ClusterListener, ReplicationTarget {
 
 	private static final Log LOG = LogFactory.getLog(ReplicationServer.class);
 
@@ -55,7 +61,7 @@
 	private Object serverMutex = new Object() {
 	};
 	private TransportServer server;
-	private ArrayList<ReplicationSession> sessions = new ArrayList<ReplicationSession>();
+	private CopyOnWriteArrayList<ReplicationSession> sessions = new CopyOnWriteArrayList<ReplicationSession>();
 
 	public ReplicationMaster(ReplicationServer replication1Server) {
 		this.replicationServer = replication1Server;
@@ -83,6 +89,7 @@
 			});
 			server.start();
 		}
+		replicationServer.getStore().getJournal().setReplicationTarget(this);
 	}
 
 	public void stop() throws Exception {
@@ -95,8 +102,34 @@
 	}
 
 	public void onClusterChange(ClusterState config) {
-		// TODO: if a slave is removed from the cluster, we should
-		// remove it's replication tracking info.
+	}
+
+
+	/**
+	 * This is called by the Journal so that we can replicate the update to the 
+	 * slaves.
+	 */
+	@Override
+	public void replicate(Location location, ByteSequence sequence, boolean sync) {
+		if( sessions.isEmpty() ) 
+			return;
+		ReplicationFrame frame = new ReplicationFrame();
+		frame.setHeader(new PBHeader().setType(PBType.JOURNAL_UPDATE));
+		PBJournalUpdate payload = new PBJournalUpdate();
+		payload.setLocation(convert(location));
+		payload.setData(ByteString.copyFrom(sequence.getData(), sequence.getOffset(), sequence.getLength()));
+		frame.setPayload(payload);
+
+		for (ReplicationSession session : sessions) {
+			if( session.subscribedToJournalUpdates.get() ) {
+				// TODO: use async send threads so that the frames can be pushed out in parallel. 
+				try {
+					session.transport.oneway(frame);
+				} catch (IOException e) {
+					session.onException(e);
+				}
+			}
+		}
 	}
 
 	class ReplicationSession implements Service, TransportListener {
@@ -171,8 +204,13 @@
 					// synced to disk, but while a lock is still held on the
 					// store so that no
 					// updates are allowed.
-
 					ArrayList<PBFileInfo> infos = new ArrayList<PBFileInfo>();
+
+					Map<Integer, DataFile> journalFiles = store.getJournal().getFileMap();
+					for (DataFile df : journalFiles.values()) {
+						infos.add(replicationServer.createInfo("journal-" + df.getDataFileId(), df.getFile(),
df.getLength()));
+					}
+					
 					SnapshotStatus snapshot = createSnapshot();
 					PBFileInfo databaseInfo = new PBFileInfo();
 					databaseInfo.setName("database");
@@ -181,11 +219,7 @@
 					databaseInfo.setEnd(snapshot.size);
 					databaseInfo.setChecksum(snapshot.checksum);
 					infos.add(databaseInfo);
-
-					Map<Integer, DataFile> journalFiles = store.getJournal().getFileMap();
-					for (DataFile df : journalFiles.values()) {
-						infos.add(createInfo("journal/" + df.getDataFileId(), df.getFile(), df.getLength()));
-					}
+					
 					rcPayload.setCopyFilesList(infos);
 				}
 			});
@@ -284,25 +318,9 @@
 		}
 	}
 
-	private PBFileInfo createInfo(String name, File file, int length) throws IOException {
-		PBFileInfo rc = new PBFileInfo();
-		rc.setName(name);
-		FileInputStream is = new FileInputStream(file);
-		byte buffer[] = new byte[1024 * 4];
-		int c;
-
-		long size = 0;
-		Checksum checksum = new Adler32();
-		while (size < length && (c = is.read(buffer, 0, (int) Math.min(length - size,
buffer.length))) >= 0) {
-			checksum.update(buffer, 0, c);
-			size += c;
-		}
-		rc.setChecksum(checksum.getValue());
-		rc.setStart(0);
-		rc.setEnd(size);
-		return rc;
-	}
+
 
 	private void onJournalUpdateAck(ReplicationFrame frame, PBJournalLocation journalLocation)
{
 	}
+
 }

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java?rev=712827&r1=712826&r2=712827&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java
(original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java
Mon Nov 10 12:41:23 2008
@@ -17,12 +17,16 @@
 package org.apache.kahadb.replication;
 
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
+import java.util.zip.Adler32;
+import java.util.zip.Checksum;
 
 import org.apache.activemq.Service;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.kahadb.page.PageFile;
+import org.apache.kahadb.replication.pb.PBFileInfo;
 import org.apache.kahadb.store.KahaDBStore;
 
 /**
@@ -82,6 +86,8 @@
 
 	private ClusterState clusterState;
 
+	private File tempReplicationDir;
+
 	public void start() throws Exception {
 		// The cluster will let us know about the cluster configuration,
 		// which lets us decide if we are going to be a slave or a master.
@@ -112,6 +118,7 @@
 					// If the slave service was not yet started.. start it up.
 					if (slave == null) {
 						LOG.info("Starting replication slave.");
+						store.open();
 						slave = new ReplicationSlave(this);
 						slave.start();
 					}
@@ -168,10 +175,10 @@
 	public File getReplicationFile(String fn) throws IOException {
 		if (fn.equals("database")) {
 			return getStore().getPageFile().getFile();
-		} if (fn.startsWith("journal/")) {
+		} if (fn.startsWith("journal-")) {
 			int id;
 			try {
-				id = Integer.parseInt(fn.substring("journal/".length()));
+				id = Integer.parseInt(fn.substring("journal-".length()));
 			} catch (NumberFormatException e) {
 				throw new IOException("Unknown replication file name: "+fn);
 			}
@@ -181,6 +188,47 @@
 		}
 	}
 
+	public File getTempReplicationDir() {
+		if( tempReplicationDir == null ) {
+			tempReplicationDir = new File( getStore().getDirectory(), "replication");
+		}
+		return tempReplicationDir;
+	}
+	
+	public File getTempReplicationFile(String fn, int snapshotId) throws IOException {
+		if (fn.equals("database")) {
+			return new File(getTempReplicationDir(), "database-"+snapshotId);
+		} if (fn.startsWith("journal-")) {
+			int id;
+			try {
+				id = Integer.parseInt(fn.substring("journal-".length()));
+			} catch (NumberFormatException e) {
+				throw new IOException("Unknown replication file name: "+fn);
+			}
+			return new File(getTempReplicationDir(), fn);
+		} else {
+			throw new IOException("Unknown replication file name: "+fn);
+		}
+	}
+	
+	PBFileInfo createInfo(String name, File file, long length) throws IOException {
+		PBFileInfo rc = new PBFileInfo();
+		rc.setName(name);
+		FileInputStream is = new FileInputStream(file);
+		byte buffer[] = new byte[1024 * 4];
+		int c;
+
+		long size = 0;
+		Checksum checksum = new Adler32();
+		while (size < length && (c = is.read(buffer, 0, (int) Math.min(length - size,
buffer.length))) >= 0) {
+			checksum.update(buffer, 0, c);
+			size += c;
+		}
+		rc.setChecksum(checksum.getValue());
+		rc.setStart(0);
+		rc.setEnd(size);
+		return rc;
+	}
 	public boolean isMaster() {
 		return master!=null;
 	}

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java?rev=712827&r1=712826&r2=712827&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
(original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
Mon Nov 10 12:41:23 2008
@@ -16,14 +16,20 @@
  */
 package org.apache.kahadb.replication;
 
+import java.io.DataOutput;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.RandomAccessFile;
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.Service;
@@ -32,13 +38,22 @@
 import org.apache.activemq.transport.TransportListener;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.journal.DataFile;
+import org.apache.kahadb.journal.Location;
+import org.apache.kahadb.page.PageFile;
 import org.apache.kahadb.replication.pb.PBFileInfo;
 import org.apache.kahadb.replication.pb.PBHeader;
+import org.apache.kahadb.replication.pb.PBJournalLocation;
+import org.apache.kahadb.replication.pb.PBJournalUpdate;
 import org.apache.kahadb.replication.pb.PBSlaveInit;
 import org.apache.kahadb.replication.pb.PBSlaveInitResponse;
 import org.apache.kahadb.replication.pb.PBType;
+import org.apache.kahadb.store.KahaDBStore;
 
 public class ReplicationSlave implements Service, ClusterListener, TransportListener {
+	
+	private static final int MAX_TRANSFER_SESSIONS = 1;
+
 	private static final Log LOG = LogFactory.getLog(ReplicationSlave.class);
 
 	private final ReplicationServer replicationServer;
@@ -52,15 +67,76 @@
 		transport = TransportFactory.connect(new URI(replicationServer.getClusterState().getMaster()));
 		transport.setTransportListener(this);
 		transport.start();
+
+		// Make sure the replication directory exists.
+		replicationServer.getTempReplicationDir().mkdirs();
 		
 		ReplicationFrame frame = new ReplicationFrame();
 		frame.setHeader(new PBHeader().setType(PBType.SLAVE_INIT));
 		PBSlaveInit payload = new PBSlaveInit();
 		payload.setNodeId(replicationServer.getNodeId());
+		
+		// This call back is executed once the checkpoint is
+		// completed and all data has been
+		// synced to disk, but while a lock is still held on the
+		// store so that no
+		// updates are allowed.
+
+		HashMap<String, PBFileInfo> infosMap = new HashMap<String, PBFileInfo>();
+		
+		// Add all the files that were being transfered..
+		File tempReplicationDir = replicationServer.getTempReplicationDir();
+		File[] list = tempReplicationDir.listFiles();
+		if( list!=null ) {
+			for (File file : list) {
+				String name = file.getName();
+				if( name.startsWith("database-") ) {
+					int snapshot;
+					try {
+						snapshot = Integer.parseInt(name.substring("database-".length()));
+					} catch (NumberFormatException e) {
+						continue;
+					}
+					
+					PBFileInfo info = replicationServer.createInfo("database", file, file.length());
+					info.setSnapshotId(snapshot);
+					infosMap.put("database", info);
+				} else if( name.startsWith("journal-") ) {
+					PBFileInfo info = replicationServer.createInfo(name, file, file.length());
+					infosMap.put(name, info);
+				}
+			}
+		}
+		
+		// Add all the db files that were not getting transfered..
+		KahaDBStore store = replicationServer.getStore();
+		Map<Integer, DataFile> journalFiles = store.getJournal().getFileMap();
+		for (DataFile df : journalFiles.values()) {
+			String name = "journal-" + df.getDataFileId();
+			// Did we have a transfer in progress for that file already?
+			if( infosMap.containsKey(name) ) {
+				continue;
+			}
+			infosMap.put(name, replicationServer.createInfo(name, df.getFile(), df.getLength()));
+		}
+		if( !infosMap.containsKey("database") ) {
+			File pageFile = store.getPageFile().getFile();
+			if( pageFile.exists() ) {
+				infosMap.put("database", replicationServer.createInfo("database", pageFile, pageFile.length()));
+			}
+		}
+		
+		ArrayList<PBFileInfo> infos = new ArrayList<PBFileInfo>(infosMap.size());
+		for (PBFileInfo info : infosMap.values()) {
+			infos.add(info);
+		}
+		payload.setCurrentFilesList(infos);
+		
 		frame.setPayload(payload);
-		LOG.info("Sending master slave init command: "+payload);
+		LOG.info("Sending master slave init command: " + payload);
+		bulkSynchronizing = true;
 		transport.oneway(frame);
-		
+
 	}
 
 	public void stop() throws Exception {
@@ -76,6 +152,8 @@
 			case SLAVE_INIT_RESPONSE:
 				onSlaveInitResponse(frame, (PBSlaveInitResponse) frame.getPayload());
 				break;
+			case JOURNAL_UPDATE:
+				onJournalUpdate(frame, (PBJournalUpdate) frame.getPayload());
 			}
 		} catch (Exception e) {
 			failed(e);
@@ -102,32 +180,137 @@
 
 	private Object transferMutex = new Object();
 	private LinkedList<PBFileInfo> transferQueue = new LinkedList<PBFileInfo>();
+	private boolean bulkSynchronizing;
+	private PBSlaveInitResponse initResponse;
+
+	int journalUpdateFileId;
+	RandomAccessFile journalUpateFile;
+	
+	HashMap<String, PBFileInfo> bulkFiles = new HashMap<String, PBFileInfo>();
+	
+	private void onJournalUpdate(ReplicationFrame frame, PBJournalUpdate update) throws IOException
{
+		boolean onlineRecovery=false;
+		PBJournalLocation location = update.getLocation();
+		byte[] data = update.getData().toByteArray();
+		synchronized (transferMutex) {
+			if( journalUpateFile==null || journalUpdateFileId!=location.getFileId() ) {
+				if( journalUpateFile!=null) {
+					journalUpateFile.close();
+				}
+				File file;
+				String name = "journal-"+location.getFileId();
+				if( bulkSynchronizing ) {
+					file = replicationServer.getTempReplicationFile(name, 0);
+					if( !bulkFiles.containsKey(name) ) {
+						bulkFiles.put(name, new PBFileInfo().setName(name));
+					}
+				} else {
+					// Once the data has been synced.. we are going to 
+					// go into an online recovery mode...
+					file = replicationServer.getReplicationFile(name);
+					onlineRecovery=true;
+				}
+				journalUpateFile = new RandomAccessFile(file, "rw");
+				journalUpdateFileId = location.getFileId();
+			}
+			journalUpateFile.seek(location.getOffset());
+			journalUpateFile.write(data);
+		}
+		
+		if( onlineRecovery ) {
+			KahaDBStore store = replicationServer.getStore();
+			// Let the journal know that we appended to one of it's files..
+			store.getJournal().appendedExternally(convert(location), data.length);
+			// Now incrementally recover those records.
+			store.incrementalRecover();
+		}
+	}
+	
+	private Location convert(PBJournalLocation location) {
+		Location rc = new Location();
+		rc.setDataFileId(location.getFileId());
+		rc.setOffset(location.getOffset());
+		return rc;
+	}
+	
+	private void commitBulkTransfer() throws IOException {
+		synchronized (transferMutex) {
+			
+			journalUpateFile.close();
+			journalUpateFile=null;
+			replicationServer.getStore().close();
+			
+			// If we got a new snapshot of the database, then we need to 
+			// delete it's assisting files too.
+			if( bulkFiles.containsKey("database") ) {
+				PageFile pageFile = replicationServer.getStore().getPageFile();
+				pageFile.getRecoveryFile().delete();
+				pageFile.getFreeFile().delete();
+			}
+			
+			for (PBFileInfo info : bulkFiles.values()) {
+				File from = replicationServer.getTempReplicationFile(info.getName(), info.getSnapshotId());
+				File to = replicationServer.getReplicationFile(info.getName());
+				move(from, to);
+			}
+			
+			delete(initResponse.getDeleteFilesList());
+			bulkSynchronizing=false;
+			
+			replicationServer.getStore().open();
+		}
+		replicationServer.getStore().incrementalRecover();
+	}
 
 	private void onSlaveInitResponse(ReplicationFrame frame, PBSlaveInitResponse response) throws
Exception {
-		LOG.info("Got init response: "+response);
-		delete(response.getDeleteFilesList());
-		synchronized(transferMutex) {
+		LOG.info("Got init response: " + response);
+		initResponse = response;
+		
+		synchronized (transferMutex) {
+			bulkFiles.clear();
+			
+			List<PBFileInfo> infos = response.getCopyFilesList();
+			for (PBFileInfo info : infos) {
+				
+				bulkFiles.put(info.getName(), info);
+				File target = replicationServer.getReplicationFile(info.getName());
+				// are we just appending to an existing file journal file?
+				if( info.getName().startsWith("journal-") && info.getStart() > 0 &&
target.exists() ) {
+					// Then copy across the first bits..
+					File tempFile = replicationServer.getTempReplicationFile(info.getName(), info.getSnapshotId());
+					
+					FileInputStream is = new FileInputStream(target);
+					FileOutputStream os = new FileOutputStream(tempFile);
+					try {
+						copy(is, os, info.getStart());
+					} finally {
+						try { is.close(); } catch (Throwable e){}
+						try { os.close(); } catch (Throwable e){}
+					}
+				}
+			}
+			
+			
 			transferQueue.clear();
-			transferQueue.addAll(response.getCopyFilesList());
+			transferQueue.addAll(infos);
 		}
 		addTransferSession();
 	}
 
-		
 	private PBFileInfo dequeueTransferQueue() throws Exception {
-		synchronized( transferMutex ) {
-			if( transferQueue.isEmpty() ) {
+		synchronized (transferMutex) {
+			if (transferQueue.isEmpty()) {
 				return null;
 			}
 			return transferQueue.removeFirst();
 		}
 	}
-	
+
 	LinkedList<TransferSession> transferSessions = new LinkedList<TransferSession>();
-	
+
 	private void addTransferSession() throws Exception {
-		synchronized( transferMutex ) {
-			while( !transferQueue.isEmpty() && transferSessions.size()<5 ) {
+		synchronized (transferMutex) {
+			while (!transferQueue.isEmpty() && transferSessions.size() < MAX_TRANSFER_SESSIONS)
{
 				TransferSession transferSession = new TransferSession();
 				transferSessions.add(transferSession);
 				try {
@@ -136,11 +319,43 @@
 					transferSessions.remove(transferSession);
 				}
 			}
+			// Once we are done processing all the transfers..
+			if (transferQueue.isEmpty() && transferSessions.isEmpty()) {
+				commitBulkTransfer();
+			}
 		}
 	}
-	
-	class TransferSession implements Service, TransportListener {
+
+	private void move(File from, File to) throws IOException {
 		
+		// If a simple rename/mv does not work..
+		to.delete();
+		if (!from.renameTo(to)) {
+			
+			// Copy and Delete.
+			FileInputStream is = null;
+			FileOutputStream os = null;
+			try {
+				is = new FileInputStream(from);
+				os = new FileOutputStream(to);
+
+				os.getChannel().transferFrom(is.getChannel(), 0, is.getChannel().size());
+			} finally {
+				try {
+					is.close();
+				} finally {
+				}
+				try {
+					os.close();
+				} finally {
+				}
+			}
+			from.delete();
+		}
+	}
+
+	class TransferSession implements Service, TransportListener {
+
 		Transport transport;
 		private PBFileInfo info;
 		private File toFile;
@@ -158,44 +373,44 @@
 		private void sendNextRequestOrStop() {
 			try {
 				PBFileInfo info = dequeueTransferQueue();
-				if( info !=null ) {
-				
-					toFile = replicationServer.getReplicationFile(info.getName());
+				if (info != null) {
+
+					toFile = replicationServer.getTempReplicationFile(info.getName(), info.getSnapshotId());
 					this.info = info;
-					
+
 					ReplicationFrame frame = new ReplicationFrame();
 					frame.setHeader(new PBHeader().setType(PBType.FILE_TRANSFER));
 					frame.setPayload(info);
-					
-					LOG.info("Requesting file: "+info.getName());
+
+					LOG.info("Requesting file: " + info.getName());
 					transferStart = System.currentTimeMillis();
-					
+
 					transport.oneway(frame);
 				} else {
 					stop();
 				}
-				
-			} catch ( Exception e ) {
+
+			} catch (Exception e) {
 				failed(e);
 			}
 		}
 
 		public void stop() throws Exception {
-			if( stopped.compareAndSet(false, true) ) {
+			if (stopped.compareAndSet(false, true)) {
 				LOG.info("File transfer session stopped.");
-				synchronized( transferMutex ) {
-					if( info!=null ) {
+				synchronized (transferMutex) {
+					if (info != null) {
 						transferQueue.addLast(info);
 					}
 					info = null;
 				}
-				Thread stopThread = new Thread("Transfer Session Shutdown: "+transport.getRemoteAddress())
{
+				Thread stopThread = new Thread("Transfer Session Shutdown: " + transport.getRemoteAddress())
{
 					@Override
 					public void run() {
 						try {
 							transport.stop();
-							synchronized( transferMutex ) {
-								transferSessions.remove(this);
+							synchronized (transferMutex) {
+								transferSessions.remove(TransferSession.this);
 								addTransferSession();
 							}
 						} catch (Exception e) {
@@ -211,21 +426,23 @@
 		public void onCommand(Object command) {
 			try {
 				ReplicationFrame frame = (ReplicationFrame) command;
-				InputStream is = (InputStream) frame.getPayload();		
+				InputStream is = (InputStream) frame.getPayload();
 				toFile.getParentFile().mkdirs();
-				FileOutputStream os = new FileOutputStream( toFile );
+				
+				RandomAccessFile os = new RandomAccessFile(toFile, "rw");
+				os.seek(info.getStart());
 				try {
 					copy(is, os, frame.getHeader().getPayloadSize());
-					long transferTime = System.currentTimeMillis()-this.transferStart;
-					float rate = frame.getHeader().getPayloadSize()*transferTime/1024000f;
-					LOG.info("File "+info.getName()+" transfered in "+transferTime+" (ms) at "+rate+" Kb/Sec");
+					long transferTime = System.currentTimeMillis() - this.transferStart;
+					float rate = frame.getHeader().getPayloadSize() * transferTime / 1024000f;
+					LOG.info("File " + info.getName() + " transfered in " + transferTime + " (ms) at " +
rate + " Kb/Sec");
 				} finally {
 					os.close();
 				}
 				this.info = null;
 				this.toFile = null;
-				
-				sendNextRequestOrStop();				
+
+				sendNextRequestOrStop();
 			} catch (Exception e) {
 				failed(e);
 			}
@@ -237,8 +454,8 @@
 
 		public void failed(Exception error) {
 			try {
-				if( !stopped.get() ) {
-					LOG.warn("Replication session failure: "+transport.getRemoteAddress());
+				if (!stopped.get()) {
+					LOG.warn("Replication session failure: " + transport.getRemoteAddress());
 				}
 				stop();
 			} catch (Exception ignore) {
@@ -247,21 +464,32 @@
 
 		public void transportInterupted() {
 		}
+
 		public void transportResumed() {
 		}
 
 	}
-	
+
 	private void copy(InputStream is, OutputStream os, long length) throws IOException {
 		byte buffer[] = new byte[1024 * 4];
-		int c=0;
-		long pos=0;
-		while ( pos <length && ((c = is.read(buffer, 0, (int)Math.min(buffer.length,
length-pos))) >= 0) ) {
+		int c = 0;
+		long pos = 0;
+		while (pos < length && ((c = is.read(buffer, 0, (int) Math.min(buffer.length,
length - pos))) >= 0)) {
 			os.write(buffer, 0, c);
-			pos+=c;
+			pos += c;
 		}
 	}
-
+	
+	private void copy(InputStream is, DataOutput os, long length) throws IOException {
+		byte buffer[] = new byte[1024 * 4];
+		int c = 0;
+		long pos = 0;
+		while (pos < length && ((c = is.read(buffer, 0, (int) Math.min(buffer.length,
length - pos))) >= 0)) {
+			os.write(buffer, 0, c);
+			pos += c;
+		}
+	}
+	
 	private void delete(List<String> files) {
 		for (String fn : files) {
 			try {
@@ -271,5 +499,4 @@
 		}
 	}
 
-
 }

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBStore.java?rev=712827&r1=712826&r2=712827&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBStore.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBStore.java Mon Nov
10 12:41:23 2008
@@ -51,6 +51,7 @@
 import org.apache.activemq.wireformat.WireFormat;
 import org.apache.kahadb.journal.Location;
 import org.apache.kahadb.page.Transaction;
+import org.apache.kahadb.replication.pb.PBJournalLocation;
 import org.apache.kahadb.store.data.KahaAddMessageCommand;
 import org.apache.kahadb.store.data.KahaCommitCommand;
 import org.apache.kahadb.store.data.KahaDestination;
@@ -579,6 +580,5 @@
             throw new IllegalArgumentException("Not in the valid destination format");
         }
     }
-    
-    
+        
 }

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java?rev=712827&r1=712826&r2=712827&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java Mon
Nov 10 12:41:23 2008
@@ -51,6 +51,7 @@
 import org.apache.kahadb.page.Page;
 import org.apache.kahadb.page.PageFile;
 import org.apache.kahadb.page.Transaction;
+import org.apache.kahadb.replication.pb.PBJournalLocation;
 import org.apache.kahadb.store.data.KahaAddMessageCommand;
 import org.apache.kahadb.store.data.KahaCommitCommand;
 import org.apache.kahadb.store.data.KahaDestination;
@@ -144,11 +145,11 @@
 
     protected boolean deleteAllMessages;
     protected File directory;
-    protected boolean recovering;
     protected Thread checkpointThread;
     protected boolean syncWrites=true;
     int checkpointInterval = 5*1000;
     int cleanupInterval = 30*1000;
+    boolean opened;
     
     protected AtomicBoolean started = new AtomicBoolean();
 
@@ -167,42 +168,8 @@
         }
     }
 
-    public void load() throws IOException {
-
-        recovering=true;
-        
-        // Creates the journal if it does not yet exist.
-        getJournal();
-        if (failIfJournalIsLocked) {
-            journal.lock();
-        } else {
-            while (true) {
-                try {
-                    journal.lock();
-                    break;
-                } catch (IOException e) {
-                    LOG.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY
/ 1000) + " seconds for the journal to be unlocked.");
-                    try {
-                        Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY);
-                    } catch (InterruptedException e1) {
-                    }
-                }
-            }
-        }
-        
-        // Creates the page file if it does not yet exist.
-        getPageFile();
-
-        journal.start();
-        if (deleteAllMessages) {
-            pageFile.delete();
-            journal.delete();
-
-            LOG.info("Persistence store purged.");
-            deleteAllMessages = false;
-        }
-
-        synchronized (indexMutex) {
+	private void loadPageFile() throws IOException {
+		synchronized (indexMutex) {
             pageFile.load();
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
                 public void execute(Transaction tx) throws IOException {
@@ -216,8 +183,6 @@
                         metadata.destinations = new BTreeIndex<String, StoredDestination>(pageFile,
tx.allocate().getPageId());
 
                         tx.store(metadata.page, metadataMarshaller, true);
-                        
-                        store(new KahaTraceCommand().setMessage("CREATED " + new Date()));
                     } else {
                         Page<Metadata> page = tx.load(0, metadataMarshaller);
                         metadata = page.get();
@@ -231,7 +196,8 @@
             pageFile.flush();
             
             // Load up all the destinations since we need to scan all the indexes to figure
out which journal files can be deleted.
-            // Perhaps we should just keep an index of file 
+            // Perhaps we should just keep an index of file
+            storedDestinations.clear();
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
                 public void execute(Transaction tx) throws IOException {
                     for (Iterator<Entry<String, StoredDestination>> iterator
= metadata.destinations.iterator(tx); iterator.hasNext();) {
@@ -241,13 +207,54 @@
                     }
                 }
             });
+        }
+	}
+	
+	public void open() throws IOException {
+		if( !opened ) {
+	        getJournal();
+	        if (failIfJournalIsLocked) {
+	            journal.lock();
+	        } else {
+	            while (true) {
+	                try {
+	                    journal.lock();
+	                    break;
+	                } catch (IOException e) {
+	                    LOG.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY
/ 1000) + " seconds for the journal to be unlocked.");
+	                    try {
+	                        Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY);
+	                    } catch (InterruptedException e1) {
+	                    }
+	                }
+	            }
+	        }        
+	        getPageFile();
+	        journal.start();
+	        loadPageFile();
+	        opened=true;
+		}
+	}
+	
+    public void load() throws IOException {
+    	
+    	open();
+        if (deleteAllMessages) {
+            journal.delete();
 
-            // Replay the the journal to get the indexes up to date with the
-            // latest
-            // updates.
+            pageFile.unload();
+            pageFile.delete();
+            metadata = new Metadata();
+            
+            LOG.info("Persistence store purged.");
+            deleteAllMessages = false;
+            
+            loadPageFile();
+        }
+    	
+        synchronized (indexMutex) {
             recover();
         }
-        recovering=false;
 
         checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
             public void run() {
@@ -276,9 +283,18 @@
             }
         };
         checkpointThread.start();
-
     }
 
+    
+	public void close() throws IOException {
+        synchronized (indexMutex) {
+            pageFile.unload();
+            metadata = new Metadata();
+        }
+        journal.close();
+        opened=false;
+	}
+	
     public void unload() throws IOException, InterruptedException {
         checkpointThread.join();
 
@@ -293,12 +309,12 @@
                 }
             });
 
-//            metadata.destinations.unload(tx);
             pageFile.unload();
             metadata = new Metadata();
         }
         store(new KahaTraceCommand().setMessage("CLEAN SHUTDOWN " + new Date()));
         journal.close();
+        opened=false;
     }
 
     /**
@@ -328,45 +344,61 @@
      * @throws IllegalStateException
      */
     private void recover() throws IllegalStateException, IOException {
-
         long start = System.currentTimeMillis();
-        Location pos = null;
-        
-        // We need to recover the transactions..
-        if (metadata.firstInProgressTransactionLocation != null) {
-            pos = metadata.firstInProgressTransactionLocation;
-        }
         
-        // Perhaps there were no transactions...
-        if( pos==null && metadata.lastUpdate!=null) {
-            // Start replay at the record after the last one recorded in the index file.
-            pos = journal.getNextLocation(metadata.lastUpdate);
-            // No journal records need to be recovered.
-            if( pos == null ) {
-                return;
-            }
+        Location recoveryPosition = getRecoveryPosition();
+        if( recoveryPosition ==null ) {
+        	return;
         }
         
-        // Do we need to start from the begining?
-        if (pos == null) {
-            // This loads the first position.
-            pos = journal.getNextLocation(null);
-        }
-
         int redoCounter = 0;
-        LOG.info("Journal Recovery Started from: " + journal + " at " + pos.getDataFileId()
+ ":" + pos.getOffset());
+        LOG.info("Journal Recovery Started from: " + journal + " at " + recoveryPosition.getDataFileId()
+ ":" + recoveryPosition.getOffset());
 
-        while (pos != null) {
-            JournalCommand message = load(pos);
-            process(message, pos);
+        while (recoveryPosition != null) {
+            JournalCommand message = load(recoveryPosition);
+            process(message, recoveryPosition);
             redoCounter++;
-            pos = journal.getNextLocation(pos);
+            recoveryPosition = journal.getNextLocation(recoveryPosition);
         }
-
-        Location location = store(new KahaTraceCommand().setMessage("RECOVERED " + new Date()),
true);
         long end = System.currentTimeMillis();
         LOG.info("Replayed " + redoCounter + " operations from redo log in " + ((end - start)
/ 1000.0f) + " seconds.");
     }
+    
+	private Location nextRecoveryPosition;
+	private Location lastRecoveryPosition;
+
+	public void incrementalRecover() throws IOException {
+        if( nextRecoveryPosition == null ) {
+        	if( lastRecoveryPosition==null ) {
+        		nextRecoveryPosition = getRecoveryPosition();
+        	} else {
+                nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
+        	}        	
+        }
+        while (nextRecoveryPosition != null) {
+        	lastRecoveryPosition = nextRecoveryPosition;
+            JournalCommand message = load(lastRecoveryPosition);
+            process(message, lastRecoveryPosition);            
+            nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
+        }
+	}
+
+	private Location getRecoveryPosition() throws IOException {
+		
+        // If we need to recover the transactions..
+        if (metadata.firstInProgressTransactionLocation != null) {
+            return metadata.firstInProgressTransactionLocation;
+        }
+        
+        // Perhaps there were no transactions...
+        if( metadata.lastUpdate!=null) {
+            // Start replay at the record after the last one recorded in the index file.
+            return journal.getNextLocation(metadata.lastUpdate);
+        }
+        
+        // This loads the first position.
+        return journal.getNextLocation(null);
+	}
 
     protected void checkpointCleanup(final boolean cleanup) {
         try {
@@ -420,9 +452,7 @@
         data.writeFramed(os);
         Location location = journal.write(os.toByteSequence(), sync);
         process(data, location);
-        if( !recovering ) {
-            metadata.lastUpdate = location;
-        }
+        metadata.lastUpdate = location;
         return location;
     }
 

Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java?rev=712827&r1=712826&r2=712827&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
(original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
Mon Nov 10 12:41:23 2008
@@ -21,8 +21,11 @@
 import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
+import javax.jms.TextMessage;
 
 import junit.framework.TestCase;
 
@@ -74,7 +77,7 @@
 		cluster.setClusterState(clusterState);
 		
 		try {
-			sendMesagesTo(500, BROKER1_URI);
+			sendMesagesTo(100, BROKER1_URI);
 		} catch( JMSException e ) {
 			fail("b1 did not become a master.");
 		}
@@ -86,13 +89,50 @@
 		clusterState.setSlaves(Arrays.asList(slaves));
 		cluster.setClusterState(clusterState);
 		
-		Thread.sleep(10000);
+		Thread.sleep(1000);
+		
+		
+		try {
+			sendMesagesTo(100, BROKER1_URI);
+		} catch( JMSException e ) {
+			fail("Failed to send more messages...");
+		}
+		
+		Thread.sleep(1000);
+		
+		// Make broker 2 the master.
+		clusterState = new ClusterState();
+		clusterState.setMaster(BROKER2_REPLICATION_ID);
+		cluster.setClusterState(clusterState);
+
+		Thread.sleep(1000);
+		
+		assertReceived(200, BROKER2_URI);
 		
 		b2.stop();		
 		b1.stop();
 		
 	}
 
+	private void assertReceived(int count, String brokerUri) throws JMSException {
+		ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUri);
+		Connection con = cf.createConnection();
+		con.start();
+		try {
+			Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+			MessageConsumer consumer = session.createConsumer(destination);
+			for (int i = 0; i < count; i++) {
+				TextMessage m = (TextMessage) consumer.receive(1000);
+				if( m==null ) {
+					fail("Failed to receive message: "+i);
+				}
+				System.out.println("Got: "+m.getText());
+			}
+		} finally {
+			try { con.close(); } catch (Throwable e) {}
+		}
+	}
+
 	private void sendMesagesTo(int count, String brokerUri) throws JMSException {
 		ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUri);
 		Connection con = cf.createConnection();



Mime
View raw message