activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r476310 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid: RapidMessageReference.java RapidMessageStore.java RapidPersistenceAdapter.java RapidTopicMessageStore.java RapidTransactionStore.java
Date Fri, 17 Nov 2006 20:53:43 GMT
Author: chirino
Date: Fri Nov 17 12:53:42 2006
New Revision: 476310

URL: http://svn.apache.org/viewvc?view=rev&rev=476310
Log:
switch from using the RecordLocation interface to the Location interface since the adapter
will need to 
be aware of what log file the active records are in so that it can delete un-used log files.

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReference.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTransactionStore.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReference.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReference.java?view=diff&rev=476310&r1=476309&r2=476310
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReference.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReference.java
Fri Nov 17 12:53:42 2006
@@ -18,30 +18,24 @@
 
 package org.apache.activemq.store.rapid;
 
-import org.apache.activeio.journal.RecordLocation;
+import org.apache.activeio.journal.active.Location;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageId;
 
 public class RapidMessageReference {
     public final MessageId messageId;
-    public final long expiration;
-    public final RecordLocation location;
+    public final Location location;
     
-    public RapidMessageReference(Message message, RecordLocation location) {
+    public RapidMessageReference(Message message, Location location) {
         this.messageId = message.getMessageId();
-        this.expiration = message.getExpiration();
         this.location=location;
     }
 
-    public long getExpiration() {
-        return expiration;
-    }
-
     public MessageId getMessageId() {
         return messageId;
     }
     
-    public RecordLocation getLocation() {
+    public Location getLocation() {
         return location;
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java?view=diff&rev=476310&r1=476309&r2=476310
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java
Fri Nov 17 12:53:42 2006
@@ -23,7 +23,6 @@
 import java.util.HashSet;
 import java.util.Iterator;
 
-import org.apache.activeio.journal.RecordLocation;
 import org.apache.activeio.journal.active.Location;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -61,7 +60,7 @@
 //    /** A MessageStore that we can use to retrieve messages quickly. */
 //    private LinkedHashMap cpAddedMessageIds;
     
-    protected RecordLocation lastLocation;
+    protected Location lastLocation;
     protected HashSet inFlightTxLocations = new HashSet();
     
     public RapidMessageStore(RapidPersistenceAdapter adapter, ActiveMQDestination destination,
MapContainer container) {
@@ -82,7 +81,7 @@
         final MessageId id = message.getMessageId();
         
         final boolean debug = log.isDebugEnabled();        
-        final RecordLocation location = peristenceAdapter.writeCommand(message, message.isResponseRequired());
+        final Location location = peristenceAdapter.writeCommand(message, message.isResponseRequired());
         final RapidMessageReference md = new RapidMessageReference(message, location);
         
         if( !context.isInTransaction() ) {
@@ -127,19 +126,19 @@
         }
     }
     
-    static protected String toString(RecordLocation location) {
+    static protected String toString(Location location) {
         Location l = (Location) location;
         return l.getLogFileId()+":"+l.getLogFileOffset();
     }
 
-    static protected RecordLocation toRecordLocation(String t) {
+    static protected Location toLocation(String t) {
         String[] strings = t.split(":");
         if( strings.length!=2 )
             throw new IllegalArgumentException("Invalid location: "+t);
         return new Location(Integer.parseInt(strings[0]),Integer.parseInt(strings[1]));
     }
 
-    public void replayAddMessage(ConnectionContext context, Message message, RecordLocation
location) {
+    public void replayAddMessage(ConnectionContext context, Message message, Location location)
{
         try {
             RapidMessageReference messageReference = new RapidMessageReference(message, location);
             messageContainer.put(message.getMessageId().toString(), messageReference);
@@ -157,7 +156,7 @@
         remove.setDestination(destination);
         remove.setMessageAck(ack);
         
-        final RecordLocation location = peristenceAdapter.writeCommand(remove, ack.isResponseRequired());
+        final Location location = peristenceAdapter.writeCommand(remove, ack.isResponseRequired());
         if( !context.isInTransaction() ) {
             if( debug )
                 log.debug("Journalled message remove for: "+ack.getLastMessageId()+", at:
"+location);
@@ -190,7 +189,7 @@
         }
     }
     
-    private void removeMessage(final MessageAck ack, final RecordLocation location) {
+    private void removeMessage(final MessageAck ack, final Location location) {
         synchronized (this) {
             lastLocation = location;
             MessageId id = ack.getLastMessageId();
@@ -270,7 +269,7 @@
      * @return
      * @throws IOException
      */
-    public RecordLocation checkpoint() throws IOException {
+    public Location checkpoint() throws IOException {
 
         ArrayList cpActiveJournalLocations;
 
@@ -281,7 +280,7 @@
         
         if( cpActiveJournalLocations.size() > 0 ) {
             Collections.sort(cpActiveJournalLocations);
-            return (RecordLocation) cpActiveJournalLocations.get(0);
+            return (Location) cpActiveJournalLocations.get(0);
         } else {
             return lastLocation;
         }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java?view=diff&rev=476310&r1=476309&r2=476310
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java
Fri Nov 17 12:53:42 2006
@@ -23,12 +23,22 @@
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activeio.journal.InvalidRecordLocationException;
 import org.apache.activeio.journal.Journal;
 import org.apache.activeio.journal.JournalEventListener;
 import org.apache.activeio.journal.RecordLocation;
 import org.apache.activeio.journal.active.JournalImpl;
+import org.apache.activeio.journal.active.Location;
 import org.apache.activeio.packet.ByteArrayPacket;
 import org.apache.activeio.packet.Packet;
 import org.apache.activemq.broker.ConnectionContext;
@@ -67,16 +77,6 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 /**
  * An implementation of {@link PersistenceAdapter} designed for use with a
  * {@link Journal} and then check pointing asynchronously on a timeout with some
@@ -471,14 +471,14 @@
      */
     private void recover() throws IllegalStateException, InvalidRecordLocationException,
IOException, IOException {
 
-        RecordLocation pos = null;
+        Location pos = null;
         int transactionCounter = 0;
 
         log.info("Journal Recovery Started.");
         ConnectionContext context = new ConnectionContext();
 
         // While we have records in the journal.
-        while ((pos = journal.getNextRecordLocation(pos)) != null) {
+        while ((pos = (Location) journal.getNextRecordLocation(pos)) != null) {
             Packet data = journal.read(pos);
             DataStructure c = (DataStructure) wireFormat.unmarshal(toByteSequence(data));
 
@@ -603,9 +603,9 @@
      * @return
      * @throws IOException
      */
-    public RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException
{
+    public Location writeCommand(DataStructure command, boolean sync) throws IOException
{
         if( started.get() )
-            return journal.write(toPacket(wireFormat.marshal(command)), sync);
+            return (Location) journal.write(toPacket(wireFormat.marshal(command)), sync);
         throw new IOException("closed");
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java?view=diff&rev=476310&r1=476309&r2=476310
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java
Fri Nov 17 12:53:42 2006
@@ -22,7 +22,8 @@
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.activeio.journal.RecordLocation;
+
+import org.apache.activeio.journal.active.Location;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.JournalTopicAck;
@@ -170,7 +171,7 @@
         ack.setSubscritionName(subscriptionName);
         ack.setClientId(clientId);
         ack.setTransactionId(context.getTransaction()!=null?context.getTransaction().getTransactionId():null);
-        final RecordLocation location=peristenceAdapter.writeCommand(ack,false);
+        final Location location=peristenceAdapter.writeCommand(ack,false);
         final SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName);
         if(!context.isInTransaction()){
             if(debug)
@@ -236,7 +237,7 @@
      * @param location
      * @param key
      */
-    private void acknowledge(MessageId messageId,RecordLocation location,SubscriptionKey
key){
+    private void acknowledge(MessageId messageId,Location location,SubscriptionKey key){
         synchronized(this){
             lastLocation=location;
             ackedLastAckLocations.put(key,messageId);
@@ -265,17 +266,17 @@
         return result;
     }
 
-    public RecordLocation checkpoint() throws IOException{
+    public Location checkpoint() throws IOException{
         ArrayList cpAckedLastAckLocations;
         // swap out the hash maps..
         synchronized(this){
             cpAckedLastAckLocations=new ArrayList(this.ackedLastAckLocations.values());
             this.ackedLastAckLocations=new HashMap();
         }
-        RecordLocation rc=super.checkpoint();
+        Location rc=super.checkpoint();
         if(!cpAckedLastAckLocations.isEmpty()){
             Collections.sort(cpAckedLastAckLocations);
-            RecordLocation t=(RecordLocation)cpAckedLastAckLocations.get(0);
+            Location t=(Location)cpAckedLastAckLocations.get(0);
             if(rc==null||t.compareTo(rc)<0){
                 rc=t;
             }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTransactionStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTransactionStore.java?view=diff&rev=476310&r1=476309&r2=476310
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTransactionStore.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTransactionStore.java
Fri Nov 17 12:53:42 2006
@@ -20,10 +20,11 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
 
 import javax.transaction.xa.XAException;
 
-import org.apache.activeio.journal.RecordLocation;
+import org.apache.activeio.journal.active.Location;
 import org.apache.activemq.command.JournalTopicAck;
 import org.apache.activemq.command.JournalTransaction;
 import org.apache.activemq.command.Message;
@@ -33,8 +34,6 @@
 import org.apache.activemq.store.TransactionRecoveryListener;
 import org.apache.activemq.store.TransactionStore;
 
-import java.util.concurrent.ConcurrentHashMap;
-
 /**
  */
 public class RapidTransactionStore implements TransactionStore {
@@ -54,9 +53,9 @@
         public byte operationType;
         public RapidMessageStore store;
         public Object data;
-        public RecordLocation location;
+        public Location location;
         
-        public TxOperation(byte operationType, RapidMessageStore store, Object data, RecordLocation
location) {
+        public TxOperation(byte operationType, RapidMessageStore store, Object data, Location
location) {
             this.operationType=operationType;
             this.store=store;
             this.data=data;
@@ -70,22 +69,22 @@
      */
     public static class Tx {
 
-        private final RecordLocation location;
+        private final Location location;
         private ArrayList operations = new ArrayList();
 
-        public Tx(RecordLocation location) {
+        public Tx(Location location) {
             this.location=location;
         }
 
-        public void add(RapidMessageStore store, Message msg, RecordLocation loc) {
+        public void add(RapidMessageStore store, Message msg, Location loc) {
             operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE, store, msg, loc));
         }
 
-        public void add(RapidMessageStore store, MessageAck ack, RecordLocation loc) {
+        public void add(RapidMessageStore store, MessageAck ack, Location loc) {
             operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE, store, ack,
loc));
         }
 
-        public void add(RapidTopicMessageStore store, JournalTopicAck ack, RecordLocation
loc) {
+        public void add(RapidTopicMessageStore store, JournalTopicAck ack, Location loc)
{
             operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack, loc));
         }
         
@@ -148,7 +147,7 @@
         preparedTransactions.put(txid, tx);
     }
 
-    public Tx getTx(Object txid, RecordLocation location) {
+    public Tx getTx(Object txid, Location location) {
         Tx tx = (Tx) inflightTransactions.get(txid);
         if (tx == null) {
             tx = new Tx(location);
@@ -249,7 +248,7 @@
      * @param message
      * @throws IOException
      */
-    void addMessage(RapidMessageStore store, Message message, RecordLocation location) throws
IOException {
+    void addMessage(RapidMessageStore store, Message message, Location location) throws IOException
{
         Tx tx = getTx(message.getTransactionId(), location);
         tx.add(store, message, location);
     }
@@ -258,19 +257,19 @@
      * @param ack
      * @throws IOException
      */
-    public void removeMessage(RapidMessageStore store, MessageAck ack, RecordLocation location)
throws IOException {
+    public void removeMessage(RapidMessageStore store, MessageAck ack, Location location)
throws IOException {
         Tx tx = getTx(ack.getTransactionId(), location);
         tx.add(store, ack, location);
     }
     
     
-    public void acknowledge(RapidTopicMessageStore store, JournalTopicAck ack, RecordLocation
location) {
+    public void acknowledge(RapidTopicMessageStore store, JournalTopicAck ack, Location location)
{
         Tx tx = getTx(ack.getTransactionId(), location);
         tx.add(store, ack, location);
     }
 
 
-    public RecordLocation checkpoint() throws IOException {
+    public Location checkpoint() throws IOException {
         
         // Nothing really to checkpoint.. since, we don't
         // checkpoint tx operations in to long term store until they are committed.
@@ -278,17 +277,17 @@
         // But we keep track of the first location of an operation
         // that was associated with an active tx. The journal can not
         // roll over active tx records.        
-        RecordLocation rc = null;
+        Location rc = null;
         for (Iterator iter = inflightTransactions.values().iterator(); iter.hasNext();) {
             Tx tx = (Tx) iter.next();
-            RecordLocation location = tx.location;
+            Location location = tx.location;
             if (rc == null || rc.compareTo(location) < 0) {
                 rc = location;
             }
         }
         for (Iterator iter = preparedTransactions.values().iterator(); iter.hasNext();) {
             Tx tx = (Tx) iter.next();
-            RecordLocation location = tx.location;
+            Location location = tx.location;
             if (rc == null || rc.compareTo(location) < 0) {
                 rc = location;
             }



Mime
View raw message