activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r713419 - in /activemq/sandbox/kahadb/src/main/java/org/apache/kahadb: journal/Journal.java replication/ReplicationMaster.java replication/ReplicationServer.java replication/ReplicationSlave.java store/MessageDatabase.java
Date Wed, 12 Nov 2008 16:31:50 GMT
Author: chirino
Date: Wed Nov 12 08:31:48 2008
New Revision: 713419

URL: http://svn.apache.org/viewvc?rev=713419&view=rev
Log:
Fixed a couple of bugs which were cropping up in the perf test.

Modified:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java?rev=713419&r1=713418&r2=713419&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java Wed Nov 12
08:31:48 2008
@@ -209,7 +209,6 @@
         }
         DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
         try {
-            reader.readLocationDetails(location);
             while (reader.readLocationDetailsAndValidate(location)) {
                 location.setOffset(location.getOffset() + location.getSize());
             }
@@ -426,6 +425,7 @@
 		if( dataFiles.getTail().getDataFileId() == loc.getDataFileId() ) {
 			// It's an update to the current log file..
 			dataFile = dataFiles.getTail();
+			dataFile.incrementLength(length);
 		} else if( dataFiles.getTail().getDataFileId()+1 == loc.getDataFileId() ) {
 			// It's an update to the next log file.
             int nextNum = loc.getDataFileId();
@@ -438,8 +438,6 @@
 		} else {
 			throw new IOException("Invalid external append.");
 		}
-
-		dataFile.incrementLength(length);
 	}
 
     public synchronized Location getNextLocation(Location location) throws IOException, IllegalStateException
{
@@ -484,7 +482,7 @@
             // Load in location size and type.
             DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
             try {
-                reader.readLocationDetails(cur);
+				reader.readLocationDetails(cur);
             } finally {
                 accessorPool.closeDataFileAccessor(reader);
             }

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java?rev=713419&r1=713418&r2=713419&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
(original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
Wed Nov 12 08:31:48 2008
@@ -24,7 +24,6 @@
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -108,6 +107,7 @@
 	}
 
 	public void onClusterChange(ClusterState config) {
+		// For now, we don't really care about changes in the slave config..
 	}
 
 
@@ -115,7 +115,6 @@
 	 * This is called by the Journal so that we can replicate the update to the 
 	 * slaves.
 	 */
-	@Override
 	public void replicate(Location location, ByteSequence sequence, boolean sync) {
 		if( sessions.isEmpty() ) 
 			return;
@@ -259,7 +258,7 @@
 					for (DataFile df : journalFiles.values()) {
 						// Look at what the slave has so that only the missing bits are transfered.
 						String name = "journal-" + df.getDataFileId();
-						PBFileInfo slaveInfo = slaveFiles.get(name);
+						PBFileInfo slaveInfo = slaveFiles.remove(name);
 						
 						// Use the checksum info to see if the slave has the file already.. Checksums are less
acurrate for
 						// small amounts of data.. so ignore small files.
@@ -292,6 +291,13 @@
 					snapshotInfos.add(info);
 					
 					rcPayload.setCopyFilesList(snapshotInfos);
+					ArrayList<String> deleteFiles = new ArrayList<String>();
+					slaveFiles.remove("database");
+					for (PBFileInfo unused : slaveFiles.values()) {
+						deleteFiles.add(unused.getName());
+					}
+					rcPayload.setDeleteFilesList(deleteFiles);
+					
 					
 					updateJournalReplicatedFiles();
 				}

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java?rev=713419&r1=713418&r2=713419&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java
(original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java
Wed Nov 12 08:31:48 2008
@@ -118,7 +118,6 @@
 					// If the slave service was not yet started.. start it up.
 					if (slave == null) {
 						LOG.info("Starting replication slave.");
-						store.open();
 						slave = new ReplicationSlave(this);
 						slave.start();
 					}

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java?rev=713419&r1=713418&r2=713419&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
(original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
Wed Nov 12 08:31:48 2008
@@ -25,6 +25,7 @@
 import java.io.OutputStream;
 import java.io.RandomAccessFile;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -59,90 +60,164 @@
 	private final ReplicationServer replicationServer;
 	private Transport transport;
 
+	// Used to bulk transfer the master state over to the slave..
+	private final Object transferMutex = new Object();
+	private final LinkedList<PBFileInfo> transferQueue = new LinkedList<PBFileInfo>();
+	private final LinkedList<TransferSession> transferSessions = new LinkedList<TransferSession>();
+	private final HashMap<String, PBFileInfo> bulkFiles = new HashMap<String, PBFileInfo>();

+	private PBSlaveInitResponse initResponse;
+	private boolean online;
+	private final AtomicBoolean started = new AtomicBoolean();
+	
+	// Used to do real time journal updates..
+	int journalUpdateFileId;
+	RandomAccessFile journalUpateFile;
+	private String master;
+	
 	public ReplicationSlave(ReplicationServer replicationServer) {
 		this.replicationServer = replicationServer;
+		master = replicationServer.getClusterState().getMaster();
 	}
 
 	public void start() throws Exception {
-		transport = TransportFactory.connect(new URI(replicationServer.getClusterState().getMaster()));
-		transport.setTransportListener(this);
-		transport.start();
-
-		// Make sure the replication directory exists.
-		replicationServer.getTempReplicationDir().mkdirs();
-		
-		ReplicationFrame frame = new ReplicationFrame();
-		frame.setHeader(new PBHeader().setType(PBType.SLAVE_INIT));
-		PBSlaveInit payload = new PBSlaveInit();
-		payload.setNodeId(replicationServer.getNodeId());
-		
-		// This call back is executed once the checkpoint is
-		// completed and all data has been
-		// synced to disk, but while a lock is still held on the
-		// store so that no
-		// updates are allowed.
+		if( started.compareAndSet(false, true)) {
+			doStart();
+		}
+	}
+	
+	public void stop() throws Exception {
+		if( started.compareAndSet(true, false)) {
+			doStop();
+		}
+	}
 
-		HashMap<String, PBFileInfo> infosMap = new HashMap<String, PBFileInfo>();
-		
-		// Add all the files that were being transfered..
-		File tempReplicationDir = replicationServer.getTempReplicationDir();
-		File[] list = tempReplicationDir.listFiles();
-		if( list!=null ) {
-			for (File file : list) {
-				String name = file.getName();
-				if( name.startsWith("database-") ) {
-					int snapshot;
-					try {
-						snapshot = Integer.parseInt(name.substring("database-".length()));
-					} catch (NumberFormatException e) {
-						continue;
+	private void doStart() throws Exception, URISyntaxException, IOException {
+		synchronized (transferMutex) {
+			
+			// Failure recovery might be trying to start us back up,
+			// but the Replication server may have already stopped us so there is not need to start
up.
+			if( !started.get() ) {
+				return;
+			}
+			
+			replicationServer.getStore().open();
+			
+			transport = TransportFactory.connect(new URI(master));
+			transport.setTransportListener(this);
+			transport.start();
+	
+			// Make sure the replication directory exists.
+			replicationServer.getTempReplicationDir().mkdirs();
+			
+			ReplicationFrame frame = new ReplicationFrame();
+			frame.setHeader(new PBHeader().setType(PBType.SLAVE_INIT));
+			PBSlaveInit payload = new PBSlaveInit();
+			payload.setNodeId(replicationServer.getNodeId());
+			
+			// This call back is executed once the checkpoint is
+			// completed and all data has been
+			// synced to disk, but while a lock is still held on the
+			// store so that no
+			// updates are allowed.
+	
+			HashMap<String, PBFileInfo> infosMap = new HashMap<String, PBFileInfo>();
+			
+			// Add all the files that were being transfered..
+			File tempReplicationDir = replicationServer.getTempReplicationDir();
+			File[] list = tempReplicationDir.listFiles();
+			if( list!=null ) {
+				for (File file : list) {
+					String name = file.getName();
+					if( name.startsWith("database-") ) {
+						int snapshot;
+						try {
+							snapshot = Integer.parseInt(name.substring("database-".length()));
+						} catch (NumberFormatException e) {
+							continue;
+						}
+						
+						PBFileInfo info = replicationServer.createInfo("database", file, 0, file.length());
+						info.setSnapshotId(snapshot);
+						infosMap.put("database", info);
+					} else if( name.startsWith("journal-") ) {
+						PBFileInfo info = replicationServer.createInfo(name, file, 0, file.length());
+						infosMap.put(name, info);
 					}
-					
-					PBFileInfo info = replicationServer.createInfo("database", file, 0, file.length());
-					info.setSnapshotId(snapshot);
-					infosMap.put("database", info);
-				} else if( name.startsWith("journal-") ) {
-					PBFileInfo info = replicationServer.createInfo(name, file, 0, file.length());
-					infosMap.put(name, info);
 				}
 			}
-		}
-		
-		// Add all the db files that were not getting transfered..
-		KahaDBStore store = replicationServer.getStore();
-		Map<Integer, DataFile> journalFiles = store.getJournal().getFileMap();
-		for (DataFile df : journalFiles.values()) {
-			String name = "journal-" + df.getDataFileId();
-			// Did we have a transfer in progress for that file already?
-			if( infosMap.containsKey(name) ) {
-				continue;
-			}
-			infosMap.put(name, replicationServer.createInfo(name, df.getFile(), 0, df.getLength()));
-		}
-		if( !infosMap.containsKey("database") ) {
-			File pageFile = store.getPageFile().getFile();
-			if( pageFile.exists() ) {
-				infosMap.put("database", replicationServer.createInfo("database", pageFile, 0, pageFile.length()));
+			
+			// Add all the db files that were not getting transfered..
+			KahaDBStore store = replicationServer.getStore();
+			Map<Integer, DataFile> journalFiles = store.getJournal().getFileMap();
+			for (DataFile df : journalFiles.values()) {
+				String name = "journal-" + df.getDataFileId();
+				// Did we have a transfer in progress for that file already?
+				if( infosMap.containsKey(name) ) {
+					continue;
+				}
+				infosMap.put(name, replicationServer.createInfo(name, df.getFile(), 0, df.getLength()));
+			}
+			if( !infosMap.containsKey("database") ) {
+				File pageFile = store.getPageFile().getFile();
+				if( pageFile.exists() ) {
+					infosMap.put("database", replicationServer.createInfo("database", pageFile, 0, pageFile.length()));
+				}
 			}
+			
+			ArrayList<PBFileInfo> infos = new ArrayList<PBFileInfo>(infosMap.size());
+			for (PBFileInfo info : infosMap.values()) {
+				infos.add(info);
+			}
+			payload.setCurrentFilesList(infos);
+			
+			frame.setPayload(payload);
+			LOG.info("Sending master slave init command: " + payload);
+			online = false;
+			transport.oneway(frame);
 		}
-		
-		ArrayList<PBFileInfo> infos = new ArrayList<PBFileInfo>(infosMap.size());
-		for (PBFileInfo info : infosMap.values()) {
-			infos.add(info);
-		}
-		payload.setCurrentFilesList(infos);
-		
-		frame.setPayload(payload);
-		LOG.info("Sending master slave init command: " + payload);
-		bulkSynchronizing = true;
-		transport.oneway(frame);
-
 	}
 
-	public void stop() throws Exception {
+	private void doStop() throws Exception, IOException {
+		synchronized (transferMutex) {
+			if( this.transport!=null ) {
+				this.transport.stop();
+				this.transport=null;
+			}
+	
+			// Stop any current transfer sessions.
+			for (TransferSession session : this.transferSessions) {
+				session.stop();
+			}
+	
+			this.transferQueue.clear();
+			
+			this.initResponse=null;
+			this.bulkFiles.clear();	
+			this.online=false;
+	
+			if( journalUpateFile !=null ) {
+				journalUpateFile.close();
+				journalUpateFile=null;
+			}
+			journalUpdateFileId=0;
+			
+			replicationServer.getStore().close();
+		}
 	}
 
 	public void onClusterChange(ClusterState config) {
+		synchronized (transferMutex) {
+			// When the master changes.. we need to re-sync with the new master.
+			if( !master.equals(config.getMaster()) ) {
+				try {
+					doStop();
+					master = config.getMaster();
+					doStart();
+				} catch (Exception e) {
+					LOG.error("Could not restart syncing with new master: "+config.getMaster()+", due to:
"+e,e);
+				}
+			}
+		}
 	}
 
 	public void onCommand(Object command) {
@@ -164,29 +239,23 @@
 		failed(error);
 	}
 
-	public void failed(Exception error) {
+	public void failed(Throwable error) {
 		try {
-			LOG.warn("Replication session fail to master: "+transport.getRemoteAddress(), error);
-			stop();
+			if( started.get() ) {
+				LOG.warn("Replication session fail to master: "+transport.getRemoteAddress(), error);
+				doStop();
+				// Wait a little an try to establish the session again..
+				Thread.sleep(1000);
+				doStart();
+			}
 		} catch (Exception ignore) {
 		}
 	}
 
 	public void transportInterupted() {
 	}
-
 	public void transportResumed() {
 	}
-
-	private Object transferMutex = new Object();
-	private LinkedList<PBFileInfo> transferQueue = new LinkedList<PBFileInfo>();
-	private boolean bulkSynchronizing;
-	private PBSlaveInitResponse initResponse;
-
-	int journalUpdateFileId;
-	RandomAccessFile journalUpateFile;
-	
-	HashMap<String, PBFileInfo> bulkFiles = new HashMap<String, PBFileInfo>();
 	
 	private void onJournalUpdate(ReplicationFrame frame, PBJournalUpdate update) throws IOException
{
 		boolean onlineRecovery=false;
@@ -199,7 +268,7 @@
 				}
 				File file;
 				String name = "journal-"+location.getFileId();
-				if( bulkSynchronizing ) {
+				if( !online ) {
 					file = replicationServer.getTempReplicationFile(name, 0);
 					if( !bulkFiles.containsKey(name) ) {
 						bulkFiles.put(name, new PBFileInfo().setName(name));
@@ -211,10 +280,12 @@
 				}
 				journalUpateFile = new RandomAccessFile(file, "rw");
 				journalUpdateFileId = location.getFileId();
-			}			
+			}
+			
+//			System.out.println("Writing: "+location.getFileId()+":"+location.getOffset()+" with
"+data.length);
 			journalUpateFile.seek(location.getOffset());
 			journalUpateFile.write(data);
-			if( !bulkSynchronizing ) {
+			if( online ) {
 				onlineRecovery=true;
 			}
 		}
@@ -235,45 +306,52 @@
 		return rc;
 	}
 	
-	private void commitBulkTransfer() throws IOException {
-		synchronized (transferMutex) {
+	private void commitBulkTransfer() {
+		try {
 			
-			LOG.info("Slave synhcronization complete, going online...");
+			synchronized (transferMutex) {
+				
+				LOG.info("Slave synhcronization complete, going online...");
 
-			if( journalUpateFile!=null ) {
-				journalUpateFile.close();
-				journalUpateFile=null;
-			}
-			replicationServer.getStore().close();
-			
-			// If we got a new snapshot of the database, then we need to 
-			// delete it's assisting files too.
-			if( bulkFiles.containsKey("database") ) {
-				PageFile pageFile = replicationServer.getStore().getPageFile();
-				pageFile.getRecoveryFile().delete();
-				pageFile.getFreeFile().delete();
-			}
-			
-			for (PBFileInfo info : bulkFiles.values()) {
-				File from = replicationServer.getTempReplicationFile(info.getName(), info.getSnapshotId());
-				File to = replicationServer.getReplicationFile(info.getName());
-				move(from, to);
+				replicationServer.getStore().close();
+				
+				if( journalUpateFile!=null ) {
+					journalUpateFile.close();
+					journalUpateFile=null;
+				}
+				
+				// If we got a new snapshot of the database, then we need to 
+				// delete it's assisting files too.
+				if( bulkFiles.containsKey("database") ) {
+					PageFile pageFile = replicationServer.getStore().getPageFile();
+					pageFile.getRecoveryFile().delete();
+					pageFile.getFreeFile().delete();
+				}
+				
+				for (PBFileInfo info : bulkFiles.values()) {
+					File from = replicationServer.getTempReplicationFile(info.getName(), info.getSnapshotId());
+					File to = replicationServer.getReplicationFile(info.getName());
+					to.getParentFile().mkdirs();
+					move(from, to);
+				}
+				
+				delete(initResponse.getDeleteFilesList());
+				online=true;
+				
+				replicationServer.getStore().open();
+				
+				LOG.info("Slave is now online.  We are now eligible to become the master.");
 			}
 			
-			delete(initResponse.getDeleteFilesList());
-			bulkSynchronizing=false;
+			// Let the master know we are now online.
+			ReplicationFrame frame = new ReplicationFrame();
+			frame.setHeader(new PBHeader().setType(PBType.SLAVE_ONLINE));
+			transport.oneway(frame);
 			
-			replicationServer.getStore().open();
-			
-			LOG.info("Slave is now online.  We are now eligible to become the master.");
+		} catch (Throwable e) {
+			e.printStackTrace();
+			failed(e);
 		}
-		
-		// Let the master know we are now online.
-		ReplicationFrame frame = new ReplicationFrame();
-		frame.setHeader(new PBHeader().setType(PBType.SLAVE_ONLINE));
-		transport.oneway(frame);
-		
-		replicationServer.getStore().incrementalRecover();
 	}
 
 	private void onSlaveInitResponse(ReplicationFrame frame, PBSlaveInitResponse response) throws
Exception {
@@ -320,11 +398,9 @@
 		}
 	}
 
-	LinkedList<TransferSession> transferSessions = new LinkedList<TransferSession>();
-
-	private void addTransferSession() throws Exception {
+	private void addTransferSession() {
 		synchronized (transferMutex) {
-			while (!transferQueue.isEmpty() && transferSessions.size() < MAX_TRANSFER_SESSIONS)
{
+			while (transport!=null && !transferQueue.isEmpty() && transferSessions.size()
< MAX_TRANSFER_SESSIONS) {
 				TransferSession transferSession = new TransferSession();
 				transferSessions.add(transferSession);
 				try {
@@ -357,11 +433,11 @@
 			} finally {
 				try {
 					is.close();
-				} finally {
+				} catch(Throwable e) {
 				}
 				try {
 					os.close();
-				} finally {
+				} catch(Throwable e) {
 				}
 			}
 			from.delete();
@@ -418,22 +494,11 @@
 					}
 					info = null;
 				}
-				Thread stopThread = new Thread("Transfer Session Shutdown: " + transport.getRemoteAddress())
{
-					@Override
-					public void run() {
-						try {
-							transport.stop();
-							synchronized (transferMutex) {
-								transferSessions.remove(TransferSession.this);
-								addTransferSession();
-							}
-						} catch (Exception e) {
-							e.printStackTrace();
-						}
-					}
-				};
-				stopThread.setDaemon(true);
-				stopThread.start();
+				transport.stop();
+				synchronized (transferMutex) {
+					transferSessions.remove(TransferSession.this);
+					addTransferSession();
+				}
 			}
 		}
 

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java?rev=713419&r1=713418&r2=713419&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java Wed
Nov 12 08:31:48 2008
@@ -149,16 +149,16 @@
     protected boolean syncWrites=true;
     int checkpointInterval = 5*1000;
     int cleanupInterval = 30*1000;
-    boolean opened;
     
     protected AtomicBoolean started = new AtomicBoolean();
+    protected AtomicBoolean opened = new AtomicBoolean();
 
     public MessageDatabase() {
     }
 
     public void start() throws Exception {
         if (started.compareAndSet(false, true)) {
-            load();
+        	load();
         }
     }
 
@@ -211,7 +211,7 @@
 	}
 	
 	public void open() throws IOException {
-		if( !opened ) {
+		if( opened.compareAndSet(false, true) ) {
 	        getJournal();
 	        if (failIfJournalIsLocked) {
 	            journal.lock();
@@ -232,74 +232,75 @@
 	        getPageFile();
 	        journal.start();
 	        loadPageFile();
-	        opened=true;
+	        
+	        checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
+	            public void run() {
+	                try {
+	                    long lastCleanup = System.currentTimeMillis();
+	                    long lastCheckpoint = System.currentTimeMillis();
+	                    
+	                    // Sleep for a short time so we can periodically check 
+	                    // to see if we need to exit this thread.
+	                    long sleepTime = Math.min(checkpointInterval, 500);
+	                    while (opened.get()) {
+	                        Thread.sleep(sleepTime);
+	                        long now = System.currentTimeMillis();
+	                        if( now - lastCleanup >= cleanupInterval ) {
+	                            checkpointCleanup(true);
+	                            lastCleanup = now;
+	                            lastCheckpoint = now;
+	                        } else if( now - lastCheckpoint >= checkpointInterval ) {
+	                            checkpointCleanup(false);
+	                            lastCheckpoint = now;
+	                        }
+	                    }
+	                } catch (InterruptedException e) {
+	                    // Looks like someone really wants us to exit this thread...
+	                }
+	            }
+	        };
+	        checkpointThread.start();
+            recover();
 		}
 	}
 	
     public void load() throws IOException {
     	
-    	open();
-        if (deleteAllMessages) {
-            journal.delete();
-
-            pageFile.unload();
-            pageFile.delete();
-            metadata = new Metadata();
-            
-            LOG.info("Persistence store purged.");
-            deleteAllMessages = false;
-            
-            loadPageFile();
-        }
-    	
         synchronized (indexMutex) {
-            recover();
+	    	open();
+	    	
+	        if (deleteAllMessages) {
+	            journal.delete();
+	
+	            pageFile.unload();
+	            pageFile.delete();
+	            metadata = new Metadata();
+	            
+	            LOG.info("Persistence store purged.");
+	            deleteAllMessages = false;
+	            
+	            loadPageFile();
+	        }
+	        store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
+
         }
 
-        checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
-            public void run() {
-                try {
-                    long lastCleanup = System.currentTimeMillis();
-                    long lastCheckpoint = System.currentTimeMillis();
-                    
-                    // Sleep for a short time so we can periodically check 
-                    // to see if we need to exit this thread.
-                    long sleepTime = Math.min(checkpointInterval, 500);
-                    while (started.get()) {
-                        Thread.sleep(sleepTime);
-                        long now = System.currentTimeMillis();
-                        if( now - lastCleanup >= cleanupInterval ) {
-                            checkpointCleanup(true);
-                            lastCleanup = now;
-                            lastCheckpoint = now;
-                        } else if( now - lastCheckpoint >= checkpointInterval ) {
-                            checkpointCleanup(false);
-                            lastCheckpoint = now;
-                        }
-                    }
-                } catch (InterruptedException e) {
-                    // Looks like someone really wants us to exit this thread...
-                }
-            }
-        };
-        checkpointThread.start();
     }
 
     
-	public void close() throws IOException {
-        synchronized (indexMutex) {
-            pageFile.unload();
-            metadata = new Metadata();
-        }
-        journal.close();
-        opened=false;
+	public void close() throws IOException, InterruptedException {
+		if( opened.compareAndSet(true, false)) {
+	        synchronized (indexMutex) {
+	            pageFile.unload();
+	            metadata = new Metadata();
+	        }
+	        journal.close();
+	        checkpointThread.join();
+		}
 	}
 	
     public void unload() throws IOException, InterruptedException {
-        checkpointThread.join();
-
         synchronized (indexMutex) {
-            
             metadata.state = CLOSED_STATE;
             metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
 
@@ -308,13 +309,8 @@
                     tx.store(metadata.page, metadataMarshaller, true);
                 }
             });
-
-            pageFile.unload();
-            metadata = new Metadata();
+            close();
         }
-        store(new KahaTraceCommand().setMessage("CLEAN SHUTDOWN " + new Date()));
-        journal.close();
-        opened=false;
     }
 
     /**
@@ -356,6 +352,7 @@
 
         while (recoveryPosition != null) {
             JournalCommand message = load(recoveryPosition);
+            metadata.lastUpdate = recoveryPosition;
             process(message, recoveryPosition);
             redoCounter++;
             recoveryPosition = journal.getNextLocation(recoveryPosition);
@@ -377,6 +374,7 @@
         }
         while (nextRecoveryPosition != null) {
         	lastRecoveryPosition = nextRecoveryPosition;
+            metadata.lastUpdate = lastRecoveryPosition;
             JournalCommand message = load(lastRecoveryPosition);
             process(message, lastRecoveryPosition);            
             nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
@@ -403,15 +401,17 @@
     protected void checkpointCleanup(final boolean cleanup) {
         try {
             synchronized (indexMutex) {
+            	if( !opened.get() ) {
+            		return;
+            	}
                 pageFile.tx().execute(new Transaction.Closure<IOException>() {
                     public void execute(Transaction tx) throws IOException {
                         checkpointUpdate(tx, cleanup);
                     }
                 });
-                pageFile.flush();
             }
-            store(new KahaTraceCommand().setMessage("CHECKPOINT " + new Date()), true);
         } catch (IOException e) {
+        	e.printStackTrace();
         }
     }
 
@@ -426,7 +426,6 @@
             pageFile.flush();
             closure.execute();
         }
-        store(new KahaTraceCommand().setMessage("CHECKPOINT " + new Date()), true);
 	}
 
     // /////////////////////////////////////////////////////////////////



Mime
View raw message