activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r564814 [4/8] - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/ activemq-core/src/main/java/org/apache/activemq/advisory/ activemq-core/src/main/java/org/apache/activemq/broker/ activemq-core/src/main/java/org/apache/ac...
Date Sat, 11 Aug 2007 00:49:31 GMT
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?view=diff&rev=564814&r1=564813&r2=564814
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java Fri Aug 10 17:49:19 2007
@@ -24,20 +24,18 @@
  */
 public class Statements {
 
-    private String tablePrefix = "";
     protected String messageTableName = "ACTIVEMQ_MSGS";
     protected String durableSubAcksTableName = "ACTIVEMQ_ACKS";
     protected String lockTableName = "ACTIVEMQ_LOCK";
-
     protected String binaryDataType = "BLOB";
     protected String containerNameDataType = "VARCHAR(250)";
     protected String msgIdDataType = "VARCHAR(250)";
     protected String sequenceDataType = "INTEGER";
     protected String longDataType = "BIGINT";
     protected String stringIdDataType = "VARCHAR(250)";
+    protected boolean useExternalMessageReferences;
 
-    protected boolean useExternalMessageReferences = false;
-
+    private String tablePrefix = "";
     private String addMessageStatement;
     private String updateMessageStatement;
     private String removeMessageStatment;
@@ -96,7 +94,7 @@
     public String[] getDropSchemaStatements() {
         if (dropSchemaStatements == null) {
             dropSchemaStatements = new String[] {"DROP TABLE " + getFullAckTableName() + "",
-                                                 "DROP TABLE " + getFullMessageTableName() + "",};
+                                                 "DROP TABLE " + getFullMessageTableName() + ""};
         }
         return dropSchemaStatements;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?view=diff&rev=564814&r1=564813&r2=564814
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Fri Aug 10 17:49:19 2007
@@ -90,7 +90,7 @@
                 // created already.
                 try {
                     LOG.debug("Executing SQL: " + createStatments[i]);
-                    boolean rc = s.execute(createStatments[i]);
+                    s.execute(createStatments[i]);
                 } catch (SQLException e) {
                     if (alreadyExists) {
                         LOG.debug("Could not create JDBC tables; The message table already existed."
@@ -122,7 +122,7 @@
                 // This will fail usually since the tables will be
                 // created already.
                 try {
-                    boolean rc = s.execute(dropStatments[i]);
+                    s.execute(dropStatments[i]);
                 } catch (SQLException e) {
                     LOG.warn("Could not drop JDBC tables; they may not exist." + " Failure was: "
                              + dropStatments[i] + " Message: " + e.getMessage() + " SQLState: "
@@ -515,7 +515,7 @@
             s = c.getConnection().prepareStatement(statements.getFindAllDurableSubsStatement());
             s.setString(1, destination.getQualifiedName());
             rs = s.executeQuery();
-            ArrayList rc = new ArrayList();
+            ArrayList<SubscriptionInfo> rc = new ArrayList<SubscriptionInfo>();
             while (rs.next()) {
                 SubscriptionInfo subscription = new SubscriptionInfo();
                 subscription.setDestination(destination);
@@ -526,7 +526,7 @@
                     .createDestination(rs.getString(4), ActiveMQDestination.QUEUE_TYPE));
                 rc.add(subscription);
             }
-            return (SubscriptionInfo[])rc.toArray(new SubscriptionInfo[rc.size()]);
+            return rc.toArray(new SubscriptionInfo[rc.size()]);
         } finally {
             close(rs);
             close(s);
@@ -616,17 +616,15 @@
         }
     }
 
-    public Set doGetDestinations(TransactionContext c) throws SQLException, IOException {
-        HashSet rc = new HashSet();
+    public Set<ActiveMQDestination> doGetDestinations(TransactionContext c) throws SQLException, IOException {
+        HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
         PreparedStatement s = null;
         ResultSet rs = null;
         try {
             s = c.getConnection().prepareStatement(statements.getFindAllDestinationsStatement());
             rs = s.executeQuery();
             while (rs.next()) {
-                rc
-                    .add(ActiveMQDestination.createDestination(rs.getString(1),
-                                                               ActiveMQDestination.QUEUE_TYPE));
+                rc.add(ActiveMQDestination.createDestination(rs.getString(1), ActiveMQDestination.QUEUE_TYPE));
             }
         } finally {
             close(rs);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java?view=diff&rev=564814&r1=564813&r2=564814
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java Fri Aug 10 17:49:19 2007
@@ -234,6 +234,7 @@
      * @return
      * @throws IOException
      */
+    @SuppressWarnings("unchecked")
     public RecordLocation checkpoint(final Callback postCheckpointTest) throws IOException {
 
         final List<MessageAck> cpRemovedMessageLocations;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java?view=diff&rev=564814&r1=564813&r2=564814
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java Fri Aug 10 17:49:19 2007
@@ -86,12 +86,12 @@
 
     private final WireFormat wireFormat = new OpenWireFormat();
 
-    private final ConcurrentHashMap queues = new ConcurrentHashMap();
-    private final ConcurrentHashMap topics = new ConcurrentHashMap();
+    private final ConcurrentHashMap<ActiveMQQueue, JournalMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, JournalMessageStore>();
+    private final ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore>();
 
     private UsageManager usageManager;
-    long checkpointInterval = 1000 * 60 * 5;
-    long lastCheckpointRequest = System.currentTimeMillis();
+    private long checkpointInterval = 1000 * 60 * 5;
+    private long lastCheckpointRequest = System.currentTimeMillis();
     private long lastCleanup = System.currentTimeMillis();
     private int maxCheckpointWorkers = 10;
     private int maxCheckpointMessageAddSize = 1024 * 1024;
@@ -107,20 +107,6 @@
 
     private final Runnable periodicCheckpointTask = createPeriodicCheckpointTask();
 
-    final Runnable createPeriodicCheckpointTask() {
-        return new Runnable() {
-            public void run() {
-                long lastTime = 0;
-                synchronized (this) {
-                    lastTime = lastCheckpointRequest;
-                }
-                if (System.currentTimeMillis() > lastTime + checkpointInterval) {
-                    checkpoint(false, true);
-                }
-            }
-        };
-    }
-
     public JournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException {
 
         this.journal = journal;
@@ -135,6 +121,20 @@
         this.longTermPersistence = longTermPersistence;
     }
 
+    final Runnable createPeriodicCheckpointTask() {
+        return new Runnable() {
+            public void run() {
+                long lastTime = 0;
+                synchronized (this) {
+                    lastTime = lastCheckpointRequest;
+                }
+                if (System.currentTimeMillis() > lastTime + checkpointInterval) {
+                    checkpoint(false, true);
+                }
+            }
+        };
+    }
+
     /**
      * @param usageManager The UsageManager that is controlling the
      *                destination's memory usage.
@@ -144,8 +144,8 @@
         longTermPersistence.setUsageManager(usageManager);
     }
 
-    public Set getDestinations() {
-        Set destinations = new HashSet(longTermPersistence.getDestinations());
+    public Set<ActiveMQDestination> getDestinations() {
+        Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(longTermPersistence.getDestinations());
         destinations.addAll(queues.keySet());
         destinations.addAll(topics.keySet());
         return destinations;
@@ -160,7 +160,7 @@
     }
 
     public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
-        JournalMessageStore store = (JournalMessageStore)queues.get(destination);
+        JournalMessageStore store = queues.get(destination);
         if (store == null) {
             MessageStore checkpointStore = longTermPersistence.createQueueMessageStore(destination);
             store = new JournalMessageStore(this, checkpointStore, destination);
@@ -170,7 +170,7 @@
     }
 
     public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException {
-        JournalTopicMessageStore store = (JournalTopicMessageStore)topics.get(destinationName);
+        JournalTopicMessageStore store = topics.get(destinationName);
         if (store == null) {
             TopicMessageStore checkpointStore = longTermPersistence.createTopicMessageStore(destinationName);
             store = new JournalTopicMessageStore(this, checkpointStore, destinationName);
@@ -204,7 +204,7 @@
             return;
         }
 
-        checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() {
+        checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
             public Thread newThread(Runnable runable) {
                 Thread t = new Thread(runable, "Journal checkpoint worker");
                 t.setPriority(7);
@@ -343,7 +343,7 @@
             LOG.debug("Checkpoint started.");
             RecordLocation newMark = null;
 
-            ArrayList futureTasks = new ArrayList(queues.size() + topics.size());
+            ArrayList<FutureTask<RecordLocation>> futureTasks = new ArrayList<FutureTask<RecordLocation>>(queues.size() + topics.size());
 
             //
             // We do many partial checkpoints (fullCheckpoint==false) to move
@@ -357,12 +357,12 @@
             // checkpoint queues on the fullCheckpoint cycles.
             //
             if (fullCheckpoint) {
-                Iterator iterator = queues.values().iterator();
+                Iterator<JournalMessageStore> iterator = queues.values().iterator();
                 while (iterator.hasNext()) {
                     try {
-                        final JournalMessageStore ms = (JournalMessageStore)iterator.next();
-                        FutureTask task = new FutureTask(new Callable() {
-                            public Object call() throws Exception {
+                        final JournalMessageStore ms = iterator.next();
+                        FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() {
+                            public RecordLocation call() throws Exception {
                                 return ms.checkpoint();
                             }
                         });
@@ -374,12 +374,12 @@
                 }
             }
 
-            Iterator iterator = topics.values().iterator();
+            Iterator<JournalTopicMessageStore> iterator = topics.values().iterator();
             while (iterator.hasNext()) {
                 try {
-                    final JournalTopicMessageStore ms = (JournalTopicMessageStore)iterator.next();
-                    FutureTask task = new FutureTask(new Callable() {
-                        public Object call() throws Exception {
+                    final JournalTopicMessageStore ms = iterator.next();
+                    FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() {
+                        public RecordLocation call() throws Exception {
                             return ms.checkpoint();
                         }
                     });
@@ -391,9 +391,9 @@
             }
 
             try {
-                for (Iterator iter = futureTasks.iterator(); iter.hasNext();) {
-                    FutureTask ft = (FutureTask)iter.next();
-                    RecordLocation mark = (RecordLocation)ft.get();
+                for (Iterator<FutureTask<RecordLocation>> iter = futureTasks.iterator(); iter.hasNext();) {
+                    FutureTask<RecordLocation> ft = iter.next();
+                    RecordLocation mark = ft.get();
                     // We only set a newMark on full checkpoints.
                     if (fullCheckpoint) {
                         if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java?view=diff&rev=564814&r1=564813&r2=564814
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java Fri Aug 10 17:49:19 2007
@@ -48,9 +48,9 @@
     private TaskRunnerFactory taskRunnerFactory;
     private Journal journal;
     private boolean useJournal = true;
-    private boolean useQuickJournal = false;
+    private boolean useQuickJournal;
     private File journalArchiveDirectory;
-    private boolean failIfJournalIsLocked = false;
+    private boolean failIfJournalIsLocked;
     private int journalThreadPriority = Thread.MAX_PRIORITY;
     private JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter();
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java?view=diff&rev=564814&r1=564813&r2=564814
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java Fri Aug 10 17:49:19 2007
@@ -45,7 +45,7 @@
     private static final Log LOG = LogFactory.getLog(JournalTopicMessageStore.class);
 
     private TopicMessageStore longTermStore;
-    private HashMap ackedLastAckLocations = new HashMap();
+    private HashMap<SubscriptionKey, MessageId> ackedLastAckLocations = new HashMap<SubscriptionKey, MessageId>();
 
     public JournalTopicMessageStore(JournalPersistenceAdapter adapter, TopicMessageStore checkpointStore,
                                     ActiveMQTopic destinationName) {
@@ -160,22 +160,22 @@
 
     public RecordLocation checkpoint() throws IOException {
 
-        final HashMap cpAckedLastAckLocations;
+        final HashMap<SubscriptionKey, MessageId> cpAckedLastAckLocations;
 
         // swap out the hash maps..
         synchronized (this) {
             cpAckedLastAckLocations = this.ackedLastAckLocations;
-            this.ackedLastAckLocations = new HashMap();
+            this.ackedLastAckLocations = new HashMap<SubscriptionKey, MessageId>();
         }
 
         return super.checkpoint(new Callback() {
             public void execute() throws Exception {
 
                 // Checkpoint the acknowledged messages.
-                Iterator iterator = cpAckedLastAckLocations.keySet().iterator();
+                Iterator<SubscriptionKey> iterator = cpAckedLastAckLocations.keySet().iterator();
                 while (iterator.hasNext()) {
-                    SubscriptionKey subscriptionKey = (SubscriptionKey)iterator.next();
-                    MessageId identity = (MessageId)cpAckedLastAckLocations.get(subscriptionKey);
+                    SubscriptionKey subscriptionKey = iterator.next();
+                    MessageId identity = cpAckedLastAckLocations.get(subscriptionKey);
                     longTermStore.acknowledge(transactionTemplate.getContext(), subscriptionKey.clientId,
                                               subscriptionKey.subscriptionName, identity);
                 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java?view=diff&rev=564814&r1=564813&r2=564814
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java Fri Aug 10 17:49:19 2007
@@ -40,8 +40,8 @@
 public class JournalTransactionStore implements TransactionStore {
 
     private final JournalPersistenceAdapter peristenceAdapter;
-    Map inflightTransactions = new LinkedHashMap();
-    Map preparedTransactions = new LinkedHashMap();
+    private Map<Object, Tx> inflightTransactions = new LinkedHashMap<Object, Tx>();
+    private Map<TransactionId, Tx> preparedTransactions = new LinkedHashMap<TransactionId, Tx>();
     private boolean doingRecover;
 
     public static class TxOperation {
@@ -70,7 +70,7 @@
     public static class Tx {
 
         private final RecordLocation location;
-        private ArrayList operations = new ArrayList();
+        private ArrayList<TxOperation> operations = new ArrayList<TxOperation>();
 
         public Tx(RecordLocation location) {
             this.location = location;
@@ -89,9 +89,9 @@
         }
 
         public Message[] getMessages() {
-            ArrayList list = new ArrayList();
-            for (Iterator iter = operations.iterator(); iter.hasNext();) {
-                TxOperation op = (TxOperation)iter.next();
+            ArrayList<Object> list = new ArrayList<Object>();
+            for (Iterator<TxOperation> iter = operations.iterator(); iter.hasNext();) {
+                TxOperation op = iter.next();
                 if (op.operationType == TxOperation.ADD_OPERATION_TYPE) {
                     list.add(op.data);
                 }
@@ -102,9 +102,9 @@
         }
 
         public MessageAck[] getAcks() {
-            ArrayList list = new ArrayList();
-            for (Iterator iter = operations.iterator(); iter.hasNext();) {
-                TxOperation op = (TxOperation)iter.next();
+            ArrayList<Object> list = new ArrayList<Object>();
+            for (Iterator<TxOperation> iter = operations.iterator(); iter.hasNext();) {
+                TxOperation op = iter.next();
                 if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
                     list.add(op.data);
                 }
@@ -114,7 +114,7 @@
             return rc;
         }
 
-        public ArrayList getOperations() {
+        public ArrayList<TxOperation> getOperations() {
             return operations;
         }
 
@@ -131,7 +131,7 @@
     public void prepare(TransactionId txid) throws IOException {
         Tx tx = null;
         synchronized (inflightTransactions) {
-            tx = (Tx)inflightTransactions.remove(txid);
+            tx = inflightTransactions.remove(txid);
         }
         if (tx == null) {
             return;
@@ -150,7 +150,7 @@
     public void replayPrepare(TransactionId txid) throws IOException {
         Tx tx = null;
         synchronized (inflightTransactions) {
-            tx = (Tx)inflightTransactions.remove(txid);
+            tx = inflightTransactions.remove(txid);
         }
         if (tx == null) {
             return;
@@ -163,7 +163,7 @@
     public Tx getTx(Object txid, RecordLocation location) {
         Tx tx = null;
         synchronized (inflightTransactions) {
-            tx = (Tx)inflightTransactions.get(txid);
+            tx = inflightTransactions.get(txid);
         }
         if (tx == null) {
             tx = new Tx(location);
@@ -180,11 +180,11 @@
         Tx tx;
         if (wasPrepared) {
             synchronized (preparedTransactions) {
-                tx = (Tx)preparedTransactions.remove(txid);
+                tx = preparedTransactions.remove(txid);
             }
         } else {
             synchronized (inflightTransactions) {
-                tx = (Tx)inflightTransactions.remove(txid);
+                tx = inflightTransactions.remove(txid);
             }
         }
         if (tx == null) {
@@ -206,11 +206,11 @@
     public Tx replayCommit(TransactionId txid, boolean wasPrepared) throws IOException {
         if (wasPrepared) {
             synchronized (preparedTransactions) {
-                return (Tx)preparedTransactions.remove(txid);
+                return preparedTransactions.remove(txid);
             }
         } else {
             synchronized (inflightTransactions) {
-                return (Tx)inflightTransactions.remove(txid);
+                return inflightTransactions.remove(txid);
             }
         }
     }
@@ -222,11 +222,11 @@
     public void rollback(TransactionId txid) throws IOException {
         Tx tx = null;
         synchronized (inflightTransactions) {
-            tx = (Tx)inflightTransactions.remove(txid);
+            tx = inflightTransactions.remove(txid);
         }
         if (tx != null) {
             synchronized (preparedTransactions) {
-                tx = (Tx)preparedTransactions.remove(txid);
+                tx = preparedTransactions.remove(txid);
             }
         }
         if (tx != null) {
@@ -269,13 +269,13 @@
         }
         this.doingRecover = true;
         try {
-            Map txs = null;
+            Map<TransactionId, Tx> txs = null;
             synchronized (preparedTransactions) {
-                txs = new LinkedHashMap(preparedTransactions);
+                txs = new LinkedHashMap<TransactionId, Tx>(preparedTransactions);
             }
-            for (Iterator iter = txs.keySet().iterator(); iter.hasNext();) {
-                Object txid = (Object)iter.next();
-                Tx tx = (Tx)txs.get(txid);
+            for (Iterator<TransactionId> iter = txs.keySet().iterator(); iter.hasNext();) {
+                Object txid = iter.next();
+                Tx tx = txs.get(txid);
                 listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks());
             }
         } finally {
@@ -316,8 +316,8 @@
         // roll over active tx records.
         RecordLocation rc = null;
         synchronized (inflightTransactions) {
-            for (Iterator iter = inflightTransactions.values().iterator(); iter.hasNext();) {
-                Tx tx = (Tx)iter.next();
+            for (Iterator<Tx> iter = inflightTransactions.values().iterator(); iter.hasNext();) {
+                Tx tx = iter.next();
                 RecordLocation location = tx.location;
                 if (rc == null || rc.compareTo(location) < 0) {
                     rc = location;
@@ -325,8 +325,8 @@
             }
         }
         synchronized (preparedTransactions) {
-            for (Iterator iter = preparedTransactions.values().iterator(); iter.hasNext();) {
-                Tx tx = (Tx)iter.next();
+            for (Iterator<Tx> iter = preparedTransactions.values().iterator(); iter.hasNext();) {
+                Tx tx = iter.next();
                 RecordLocation location = tx.location;
                 if (rc == null || rc.compareTo(location) < 0) {
                     rc = location;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java?view=diff&rev=564814&r1=564813&r2=564814
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java Fri Aug 10 17:49:19 2007
@@ -38,7 +38,7 @@
 
     protected final ActiveMQDestination destination;
     protected final MapContainer<MessageId, Message> messageContainer;
-    protected StoreEntry batchEntry = null;
+    protected StoreEntry batchEntry;
 
     public KahaMessageStore(MapContainer<MessageId, Message> container, ActiveMQDestination destination)
         throws IOException {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java?view=diff&rev=564814&r1=564813&r2=564814
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java Fri Aug 10 17:49:19 2007
@@ -50,19 +50,20 @@
 
 /**
  * @org.apache.xbean.XBean
- * 
  * @version $Revision: 1.4 $
  */
 public class KahaPersistenceAdapter implements PersistenceAdapter {
 
     private static final int STORE_LOCKED_WAIT_DELAY = 10 * 1000;
     private static final Log LOG = LogFactory.getLog(KahaPersistenceAdapter.class);
-    static final String PREPARED_TRANSACTIONS_NAME = "PreparedTransactions";
-    KahaTransactionStore transactionStore;
-    ConcurrentHashMap<ActiveMQTopic, TopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, TopicMessageStore>();
-    ConcurrentHashMap<ActiveMQQueue, MessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, MessageStore>();
-    ConcurrentHashMap<ActiveMQDestination, MessageStore> messageStores = new ConcurrentHashMap<ActiveMQDestination, MessageStore>();
+    private static final String PREPARED_TRANSACTIONS_NAME = "PreparedTransactions";
+
     protected OpenWireFormat wireFormat = new OpenWireFormat();
+    protected KahaTransactionStore transactionStore;
+    protected ConcurrentHashMap<ActiveMQTopic, TopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, TopicMessageStore>();
+    protected ConcurrentHashMap<ActiveMQQueue, MessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, MessageStore>();
+    protected ConcurrentHashMap<ActiveMQDestination, MessageStore> messageStores = new ConcurrentHashMap<ActiveMQDestination, MessageStore>();
+
     private long maxDataFileLength = 32 * 1024 * 1024;
     private File directory;
     private String brokerName;
@@ -198,10 +199,10 @@
         return container;
     }
 
-    protected MapContainer<String, Object> getSubsMapContainer(Object id, String containerName)
+    protected MapContainer getSubsMapContainer(Object id, String containerName)
         throws IOException {
         Store store = getStore();
-        MapContainer<String, Object> container = store.getMapContainer(id, containerName);
+        MapContainer container = store.getMapContainer(id, containerName);
         container.setKeyMarshaller(Store.STRING_MARSHALLER);
         container.setValueMarshaller(createMessageMarshaller());
         container.load();
@@ -236,7 +237,6 @@
 
     /**
      * @param maxDataFileLength the maxDataFileLength to set
-     * 
      * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
      */
     public void setMaxDataFileLength(long maxDataFileLength) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?view=diff&rev=564814&r1=564813&r2=564814
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java Fri Aug 10 17:49:19 2007
@@ -33,10 +33,10 @@
     protected final ActiveMQDestination destination;
     protected final MapContainer<MessageId, ReferenceRecord> messageContainer;
     protected KahaReferenceStoreAdapter adapter;
-    private StoreEntry batchEntry = null;
-    private String lastBatchId = null;
+    private StoreEntry batchEntry;
+    private String lastBatchId;
 
-    public KahaReferenceStore(KahaReferenceStoreAdapter adapter, MapContainer container,
+    public KahaReferenceStore(KahaReferenceStoreAdapter adapter, MapContainer<MessageId, ReferenceRecord> container,
                               ActiveMQDestination destination) throws IOException {
         this.adapter = adapter;
         this.messageContainer = container;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java?view=diff&rev=564814&r1=564813&r2=564814
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java Fri Aug 10 17:49:19 2007
@@ -53,9 +53,9 @@
     private static final String RECORD_REFERENCES = "record-references";
     private static final String TRANSACTIONS = "transactions-state";
     private MapContainer stateMap;
-    private MapContainer preparedTransactions;
+    private MapContainer<TransactionId, AMQTx> preparedTransactions;
     private Map<Integer, AtomicInteger> recordReferences = new HashMap<Integer, AtomicInteger>();
-    private ListContainer durableSubscribers;
+    private ListContainer<SubscriptionInfo> durableSubscribers;
     private boolean storeValid;
     private Store stateStore;
 
@@ -130,9 +130,8 @@
         if (rc == null) {
             Store store = getStore();
             MapContainer messageContainer = getMapReferenceContainer(destination, "topic-data");
-            MapContainer subsContainer = getSubsMapContainer(destination.toString() + "-Subscriptions",
-                                                             "blob");
-            ListContainer ackContainer = store.getListContainer(destination.toString(), "topic-acks");
+            MapContainer subsContainer = getSubsMapContainer(destination.toString() + "-Subscriptions", "blob");
+            ListContainer<TopicSubAck> ackContainer = store.getListContainer(destination.toString(), "topic-acks");
             ackContainer.setMarshaller(new TopicSubAckMarshaller());
             rc = new KahaTopicReferenceStore(store, this, messageContainer, ackContainer, subsContainer,
                                              destination);
@@ -212,8 +211,8 @@
      * @see org.apache.activemq.store.ReferenceStoreAdapter#recoverState()
      */
     public void recoverState() throws IOException {
-        for (Iterator i = durableSubscribers.iterator(); i.hasNext();) {
-            SubscriptionInfo info = (SubscriptionInfo)i.next();
+        for (Iterator<SubscriptionInfo> i = durableSubscribers.iterator(); i.hasNext();) {
+            SubscriptionInfo info = i.next();
             TopicReferenceStore ts = createTopicReferenceStore((ActiveMQTopic)info.getDestination());
             ts.addSubsciption(info, false);
         }
@@ -222,9 +221,9 @@
     public Map<TransactionId, AMQTx> retrievePreparedState() throws IOException {
         Map<TransactionId, AMQTx> result = new HashMap<TransactionId, AMQTx>();
         preparedTransactions.load();
-        for (Iterator i = preparedTransactions.keySet().iterator(); i.hasNext();) {
-            TransactionId key = (TransactionId)i.next();
-            AMQTx value = (AMQTx)preparedTransactions.get(key);
+        for (Iterator<TransactionId> i = preparedTransactions.keySet().iterator(); i.hasNext();) {
+            TransactionId key = i.next();
+            AMQTx value = preparedTransactions.get(key);
             result.put(key, value);
         }
         return result;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java?view=diff&rev=564814&r1=564813&r2=564814
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java Fri Aug 10 17:49:19 2007
@@ -39,19 +39,19 @@
 public class KahaTopicMessageStore extends KahaMessageStore implements TopicMessageStore {
 
     protected ListContainer<TopicSubAck> ackContainer;
-    private Map subscriberContainer;
+    protected Map<Object, TopicSubContainer> subscriberMessages = new ConcurrentHashMap<Object, TopicSubContainer>();
+    private Map<String, SubscriptionInfo> subscriberContainer;
     private Store store;
-    protected Map subscriberMessages = new ConcurrentHashMap();
 
-    public KahaTopicMessageStore(Store store, MapContainer messageContainer,
-                                 ListContainer<TopicSubAck> ackContainer, MapContainer subsContainer,
+    public KahaTopicMessageStore(Store store, MapContainer<MessageId, Message> messageContainer,
+                                 ListContainer<TopicSubAck> ackContainer, MapContainer<String, SubscriptionInfo> subsContainer,
                                  ActiveMQDestination destination) throws IOException {
         super(messageContainer, destination);
         this.store = store;
         this.ackContainer = ackContainer;
         subscriberContainer = subsContainer;
         // load all the Ack containers
-        for (Iterator i = subscriberContainer.keySet().iterator(); i.hasNext();) {
+        for (Iterator<String> i = subscriberContainer.keySet().iterator(); i.hasNext();) {
             Object key = i.next();
             addSubscriberMessageContainer(key);
         }
@@ -67,8 +67,8 @@
             tsa.setCount(subscriberCount);
             tsa.setMessageEntry(messageEntry);
             StoreEntry ackEntry = ackContainer.placeLast(tsa);
-            for (Iterator i = subscriberMessages.values().iterator(); i.hasNext();) {
-                TopicSubContainer container = (TopicSubContainer)i.next();
+            for (Iterator<TopicSubContainer> i = subscriberMessages.values().iterator(); i.hasNext();) {
+                TopicSubContainer container = i.next();
                 ConsumerMessageRef ref = new ConsumerMessageRef();
                 ref.setAckEntry(ackEntry);
                 ref.setMessageEntry(messageEntry);
@@ -81,14 +81,14 @@
     public synchronized void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
                                          MessageId messageId) throws IOException {
         String subcriberId = getSubscriptionKey(clientId, subscriptionName);
-        TopicSubContainer container = (TopicSubContainer)subscriberMessages.get(subcriberId);
+        TopicSubContainer container = subscriberMessages.get(subcriberId);
         if (container != null) {
             ConsumerMessageRef ref = container.remove(messageId);
             if (container.isEmpty()) {
                 container.reset();
             }
             if (ref != null) {
-                TopicSubAck tsa = (TopicSubAck)ackContainer.get(ref.getAckEntry());
+                TopicSubAck tsa = ackContainer.get(ref.getAckEntry());
                 if (tsa != null) {
                     if (tsa.decrementCount() <= 0) {
                         StoreEntry entry = ref.getAckEntry();
@@ -106,7 +106,7 @@
     }
 
     public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
-        return (SubscriptionInfo)subscriberContainer.get(getSubscriptionKey(clientId, subscriptionName));
+        return subscriberContainer.get(getSubscriptionKey(clientId, subscriptionName));
     }
 
     public synchronized void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException {
@@ -136,7 +136,7 @@
     public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener)
         throws Exception {
         String key = getSubscriptionKey(clientId, subscriptionName);
-        TopicSubContainer container = (TopicSubContainer)subscriberMessages.get(key);
+        TopicSubContainer container = subscriberMessages.get(key);
         if (container != null) {
             for (Iterator i = container.iterator(); i.hasNext();) {
                 ConsumerMessageRef ref = (ConsumerMessageRef)i.next();
@@ -153,7 +153,7 @@
     public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned,
                                     MessageRecoveryListener listener) throws Exception {
         String key = getSubscriptionKey(clientId, subscriptionName);
-        TopicSubContainer container = (TopicSubContainer)subscriberMessages.get(key);
+        TopicSubContainer container = subscriberMessages.get(key);
         if (container != null) {
             int count = 0;
             StoreEntry entry = container.getBatchEntry();
@@ -190,7 +190,7 @@
     }
 
     public SubscriptionInfo[] getAllSubscriptions() throws IOException {
-        return (SubscriptionInfo[])subscriberContainer.values()
+        return subscriberContainer.values()
             .toArray(new SubscriptionInfo[subscriberContainer.size()]);
     }
 
@@ -211,11 +211,11 @@
 
     protected void removeSubscriberMessageContainer(Object key) throws IOException {
         subscriberContainer.remove(key);
-        TopicSubContainer container = (TopicSubContainer)subscriberMessages.remove(key);
+        TopicSubContainer container = subscriberMessages.remove(key);
         for (Iterator i = container.iterator(); i.hasNext();) {
             ConsumerMessageRef ref = (ConsumerMessageRef)i.next();
             if (ref != null) {
-                TopicSubAck tsa = (TopicSubAck)ackContainer.get(ref.getAckEntry());
+                TopicSubAck tsa = ackContainer.get(ref.getAckEntry());
                 if (tsa != null) {
                     if (tsa.decrementCount() <= 0) {
                         ackContainer.remove(ref.getAckEntry());
@@ -231,7 +231,7 @@
 
     public int getMessageCount(String clientId, String subscriberName) throws IOException {
         String key = getSubscriptionKey(clientId, subscriberName);
-        TopicSubContainer container = (TopicSubContainer)subscriberMessages.get(key);
+        TopicSubContainer container = subscriberMessages.get(key);
         return container != null ? container.size() : 0;
     }
 
@@ -243,15 +243,15 @@
     public synchronized void removeAllMessages(ConnectionContext context) throws IOException {
         messageContainer.clear();
         ackContainer.clear();
-        for (Iterator i = subscriberMessages.values().iterator(); i.hasNext();) {
-            TopicSubContainer container = (TopicSubContainer)i.next();
+        for (Iterator<TopicSubContainer> i = subscriberMessages.values().iterator(); i.hasNext();) {
+            TopicSubContainer container = i.next();
             container.clear();
         }
     }
 
     public synchronized void resetBatching(String clientId, String subscriptionName) {
         String key = getSubscriptionKey(clientId, subscriptionName);
-        TopicSubContainer topicSubContainer = (TopicSubContainer)subscriberMessages.get(key);
+        TopicSubContainer topicSubContainer = subscriberMessages.get(key);
         if (topicSubContainer != null) {
             topicSubContainer.reset();
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?view=diff&rev=564814&r1=564813&r2=564814
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java Fri Aug 10 17:49:19 2007
@@ -37,21 +37,21 @@
 public class KahaTopicReferenceStore extends KahaReferenceStore implements TopicReferenceStore {
 
     protected ListContainer<TopicSubAck> ackContainer;
-    private Map subscriberContainer;
+    protected Map<String, TopicSubContainer> subscriberMessages = new ConcurrentHashMap<String, TopicSubContainer>();
+    private Map<String, SubscriptionInfo> subscriberContainer;
     private Store store;
-    protected Map subscriberMessages = new ConcurrentHashMap();
 
     public KahaTopicReferenceStore(Store store, KahaReferenceStoreAdapter adapter,
-                                   MapContainer messageContainer, ListContainer ackContainer,
-                                   MapContainer subsContainer, ActiveMQDestination destination)
+                                   MapContainer<MessageId, ReferenceRecord> messageContainer, ListContainer<TopicSubAck> ackContainer,
+                                   MapContainer<String, SubscriptionInfo> subsContainer, ActiveMQDestination destination)
         throws IOException {
         super(adapter, messageContainer, destination);
         this.store = store;
         this.ackContainer = ackContainer;
         subscriberContainer = subsContainer;
         // load all the Ack containers
-        for (Iterator i = subscriberContainer.keySet().iterator(); i.hasNext();) {
-            String key = (String)i.next();
+        for (Iterator<String> i = subscriberContainer.keySet().iterator(); i.hasNext();) {
+            String key = i.next();
             addSubscriberMessageContainer(key);
         }
     }
@@ -79,8 +79,8 @@
             tsa.setCount(subscriberCount);
             tsa.setMessageEntry(messageEntry);
             final StoreEntry ackEntry = ackContainer.placeLast(tsa);
-            for (final Iterator i = subscriberMessages.values().iterator(); i.hasNext();) {
-                final TopicSubContainer container = (TopicSubContainer)i.next();
+            for (final Iterator<TopicSubContainer> i = subscriberMessages.values().iterator(); i.hasNext();) {
+                final TopicSubContainer container = i.next();
                 final ConsumerMessageRef ref = new ConsumerMessageRef();
                 ref.setAckEntry(ackEntry);
                 ref.setMessageEntry(messageEntry);
@@ -100,9 +100,9 @@
 
     public void addReferenceFileIdsInUse() {
         for (StoreEntry entry = ackContainer.getFirst(); entry != null; entry = ackContainer.getNext(entry)) {
-            TopicSubAck subAck = (TopicSubAck)ackContainer.get(entry);
+            TopicSubAck subAck = ackContainer.get(entry);
             if (subAck.getCount() > 0) {
-                ReferenceRecord rr = (ReferenceRecord)messageContainer.getValue(subAck.getMessageEntry());
+                ReferenceRecord rr = messageContainer.getValue(subAck.getMessageEntry());
                 addInterest(rr);
             }
         }
@@ -121,11 +121,11 @@
                                          MessageId messageId) throws IOException {
         String key = getSubscriptionKey(clientId, subscriptionName);
 
-        TopicSubContainer container = (TopicSubContainer)subscriberMessages.get(key);
+        TopicSubContainer container = subscriberMessages.get(key);
         if (container != null) {
             ConsumerMessageRef ref = container.remove(messageId);
             if (ref != null) {
-                TopicSubAck tsa = (TopicSubAck)ackContainer.get(ref.getAckEntry());
+                TopicSubAck tsa = ackContainer.get(ref.getAckEntry());
                 if (tsa != null) {
                     if (tsa.decrementCount() <= 0) {
                         StoreEntry entry = ref.getAckEntry();
@@ -179,24 +179,24 @@
     }
 
     public SubscriptionInfo[] getAllSubscriptions() throws IOException {
-        return (SubscriptionInfo[])subscriberContainer.values()
+        return subscriberContainer.values()
             .toArray(new SubscriptionInfo[subscriberContainer.size()]);
     }
 
     public int getMessageCount(String clientId, String subscriberName) throws IOException {
         String key = getSubscriptionKey(clientId, subscriberName);
-        TopicSubContainer container = (TopicSubContainer)subscriberMessages.get(key);
+        TopicSubContainer container = subscriberMessages.get(key);
         return container != null ? container.size() : 0;
     }
 
     public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
-        return (SubscriptionInfo)subscriberContainer.get(getSubscriptionKey(clientId, subscriptionName));
+        return subscriberContainer.get(getSubscriptionKey(clientId, subscriptionName));
     }
 
     public synchronized void recoverNextMessages(String clientId, String subscriptionName, int maxReturned,
                                                  MessageRecoveryListener listener) throws Exception {
         String key = getSubscriptionKey(clientId, subscriptionName);
-        TopicSubContainer container = (TopicSubContainer)subscriberMessages.get(key);
+        TopicSubContainer container = subscriberMessages.get(key);
         if (container != null) {
             int count = 0;
             StoreEntry entry = container.getBatchEntry();
@@ -230,7 +230,7 @@
     public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener)
         throws Exception {
         String key = getSubscriptionKey(clientId, subscriptionName);
-        TopicSubContainer container = (TopicSubContainer)subscriberMessages.get(key);
+        TopicSubContainer container = subscriberMessages.get(key);
         if (container != null) {
             for (Iterator i = container.iterator(); i.hasNext();) {
                 ConsumerMessageRef ref = (ConsumerMessageRef)i.next();
@@ -246,7 +246,7 @@
 
     public synchronized void resetBatching(String clientId, String subscriptionName) {
         String key = getSubscriptionKey(clientId, subscriptionName);
-        TopicSubContainer topicSubContainer = (TopicSubContainer)subscriberMessages.get(key);
+        TopicSubContainer topicSubContainer = subscriberMessages.get(key);
         if (topicSubContainer != null) {
             topicSubContainer.reset();
         }
@@ -254,11 +254,11 @@
 
     protected void removeSubscriberMessageContainer(String key) throws IOException {
         subscriberContainer.remove(key);
-        TopicSubContainer container = (TopicSubContainer)subscriberMessages.remove(key);
+        TopicSubContainer container = subscriberMessages.remove(key);
         for (Iterator i = container.iterator(); i.hasNext();) {
             ConsumerMessageRef ref = (ConsumerMessageRef)i.next();
             if (ref != null) {
-                TopicSubAck tsa = (TopicSubAck)ackContainer.get(ref.getAckEntry());
+                TopicSubAck tsa = ackContainer.get(ref.getAckEntry());
                 if (tsa != null) {
                     if (tsa.decrementCount() <= 0) {
                         ackContainer.remove(ref.getAckEntry());

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransaction.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransaction.java?view=diff&rev=564814&r1=564813&r2=564814
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransaction.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransaction.java Fri Aug 10 17:49:19 2007
@@ -33,8 +33,7 @@
  * @version $Revision: 1.4 $
  */
 class KahaTransaction {
-    private static final Log LOG = LogFactory.getLog(KahaTransaction.class);
-    protected List list = new ArrayList();
+    protected List<TxCommand> list = new ArrayList<TxCommand>();
 
     void add(KahaMessageStore store, BaseCommand command) {
         TxCommand tx = new TxCommand();
@@ -44,27 +43,27 @@
     }
 
     Message[] getMessages() {
-        List result = new ArrayList();
+        List<BaseCommand> result = new ArrayList<BaseCommand>();
         for (int i = 0; i < list.size(); i++) {
-            TxCommand command = (TxCommand)list.get(i);
+            TxCommand command = list.get(i);
             if (command.isAdd()) {
                 result.add(command.getCommand());
             }
         }
         Message[] messages = new Message[result.size()];
-        return (Message[])result.toArray(messages);
+        return result.toArray(messages);
     }
 
     MessageAck[] getAcks() {
-        List result = new ArrayList();
+        List<BaseCommand> result = new ArrayList<BaseCommand>();
         for (int i = 0; i < list.size(); i++) {
-            TxCommand command = (TxCommand)list.get(i);
+            TxCommand command = list.get(i);
             if (command.isRemove()) {
                 result.add(command.getCommand());
             }
         }
         MessageAck[] acks = new MessageAck[result.size()];
-        return (MessageAck[])result.toArray(acks);
+        return result.toArray(acks);
     }
 
     void prepare() {
@@ -79,14 +78,14 @@
      */
     void commit(KahaTransactionStore transactionStore) throws IOException {
         for (int i = 0; i < list.size(); i++) {
-            TxCommand command = (TxCommand)list.get(i);
+            TxCommand command = list.get(i);
             MessageStore ms = transactionStore.getStoreById(command.getMessageStoreKey());
             if (command.isAdd()) {
                 ms.addMessage(null, (Message)command.getCommand());
             }
         }
         for (int i = 0; i < list.size(); i++) {
-            TxCommand command = (TxCommand)list.get(i);
+            TxCommand command = list.get(i);
             MessageStore ms = transactionStore.getStoreById(command.getMessageStoreKey());
             if (command.isRemove()) {
                 ms.removeMessage(null, (MessageAck)command.getCommand());
@@ -94,11 +93,11 @@
         }
     }
 
-    List getList() {
-        return new ArrayList(list);
+    List<TxCommand> getList() {
+        return new ArrayList<TxCommand>(list);
     }
 
-    void setList(List list) {
+    void setList(List<TxCommand> list) {
         this.list = list;
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubAck.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubAck.java?view=diff&rev=564814&r1=564813&r2=564814
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubAck.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubAck.java Fri Aug 10 17:49:19 2007
@@ -25,7 +25,7 @@
  */
 public class TopicSubAck {
 
-    private int count = 0;
+    private int count;
     private StoreEntry messageEntry;
 
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java?view=diff&rev=564814&r1=564813&r2=564814
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java Fri Aug 10 17:49:19 2007
@@ -41,14 +41,14 @@
 public class MemoryMessageStore implements MessageStore {
 
     protected final ActiveMQDestination destination;
-    protected final Map messageTable;
+    protected final Map<MessageId, Message> messageTable;
     protected MessageId lastBatchId;
 
     public MemoryMessageStore(ActiveMQDestination destination) {
-        this(destination, new LinkedHashMap());
+        this(destination, new LinkedHashMap<MessageId, Message>());
     }
 
-    public MemoryMessageStore(ActiveMQDestination destination, Map messageTable) {
+    public MemoryMessageStore(ActiveMQDestination destination, Map<MessageId, Message> messageTable) {
         this.destination = destination;
         this.messageTable = Collections.synchronizedMap(messageTable);
     }
@@ -68,7 +68,7 @@
     // }
 
     public Message getMessage(MessageId identity) throws IOException {
-        return (Message)messageTable.get(identity);
+        return messageTable.get(identity);
     }
 
     // public String getMessageReference(MessageId identity) throws IOException{
@@ -92,8 +92,8 @@
         // the message table is a synchronizedMap - so just have to synchronize
         // here
         synchronized (messageTable) {
-            for (Iterator iter = messageTable.values().iterator(); iter.hasNext();) {
-                Object msg = (Object)iter.next();
+            for (Iterator<Message> iter = messageTable.values().iterator(); iter.hasNext();) {
+                Object msg = iter.next();
                 if (msg.getClass() == MessageId.class) {
                     listener.recoverMessageReference((MessageId)msg);
                 } else {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java?view=diff&rev=564814&r1=564813&r2=564814
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java Fri Aug 10 17:49:19 2007
@@ -24,6 +24,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.memory.UsageManager;
@@ -42,16 +43,16 @@
     private static final Log LOG = LogFactory.getLog(MemoryPersistenceAdapter.class);
 
     MemoryTransactionStore transactionStore;
-    ConcurrentHashMap topics = new ConcurrentHashMap();
-    ConcurrentHashMap queues = new ConcurrentHashMap();
+    ConcurrentHashMap<ActiveMQDestination, TopicMessageStore> topics = new ConcurrentHashMap<ActiveMQDestination, TopicMessageStore>();
+    ConcurrentHashMap<ActiveMQDestination, MessageStore> queues = new ConcurrentHashMap<ActiveMQDestination, MessageStore>();
     private boolean useExternalMessageReferences;
 
-    public Set getDestinations() {
-        Set rc = new HashSet(queues.size() + topics.size());
-        for (Iterator iter = queues.keySet().iterator(); iter.hasNext();) {
+    public Set<ActiveMQDestination> getDestinations() {
+        Set<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(queues.size() + topics.size());
+        for (Iterator<ActiveMQDestination> iter = queues.keySet().iterator(); iter.hasNext();) {
             rc.add(iter.next());
         }
-        for (Iterator iter = topics.keySet().iterator(); iter.hasNext();) {
+        for (Iterator<ActiveMQDestination> iter = topics.keySet().iterator(); iter.hasNext();) {
             rc.add(iter.next());
         }
         return rc;
@@ -62,7 +63,7 @@
     }
 
     public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
-        MessageStore rc = (MessageStore)queues.get(destination);
+        MessageStore rc = queues.get(destination);
         if (rc == null) {
             rc = new MemoryMessageStore(destination);
             if (transactionStore != null) {
@@ -74,7 +75,7 @@
     }
 
     public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
-        TopicMessageStore rc = (TopicMessageStore)topics.get(destination);
+        TopicMessageStore rc = topics.get(destination);
         if (rc == null) {
             rc = new MemoryTopicMessageStore(destination);
             if (transactionStore != null) {
@@ -112,13 +113,13 @@
     }
 
     public void deleteAllMessages() throws IOException {
-        for (Iterator iter = topics.values().iterator(); iter.hasNext();) {
+        for (Iterator<TopicMessageStore> iter = topics.values().iterator(); iter.hasNext();) {
             MemoryMessageStore store = asMemoryMessageStore(iter.next());
             if (store != null) {
                 store.delete();
             }
         }
-        for (Iterator iter = queues.values().iterator(); iter.hasNext();) {
+        for (Iterator<MessageStore> iter = queues.values().iterator(); iter.hasNext();) {
             MemoryMessageStore store = asMemoryMessageStore(iter.next());
             if (store != null) {
                 store.delete();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java?view=diff&rev=564814&r1=564813&r2=564814
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java Fri Aug 10 17:49:19 2007
@@ -37,41 +37,45 @@
  */
 public class MemoryTopicMessageStore extends MemoryMessageStore implements TopicMessageStore {
 
-    private Map subscriberDatabase;
-    private Map topicSubMap;
+    private Map<SubscriptionKey, SubscriptionInfo> subscriberDatabase;
+    private Map<SubscriptionKey, MemoryTopicSub> topicSubMap;
 
     public MemoryTopicMessageStore(ActiveMQDestination destination) {
-        this(destination, new LRUCache(100, 100, 0.75f, false), makeMap());
+        this(destination, new LRUCache<MessageId, Message>(100, 100, 0.75f, false), makeSubscriptionInfoMap());
     }
 
-    protected static Map makeMap() {
-        return Collections.synchronizedMap(new HashMap());
-    }
-
-    public MemoryTopicMessageStore(ActiveMQDestination destination, Map messageTable, Map subscriberDatabase) {
+    public MemoryTopicMessageStore(ActiveMQDestination destination, Map<MessageId, Message> messageTable, Map<SubscriptionKey, SubscriptionInfo> subscriberDatabase) {
         super(destination, messageTable);
         this.subscriberDatabase = subscriberDatabase;
-        this.topicSubMap = makeMap();
+        this.topicSubMap = makeSubMap();
+    }
+
+    protected static Map<SubscriptionKey, SubscriptionInfo> makeSubscriptionInfoMap() {
+        return Collections.synchronizedMap(new HashMap<SubscriptionKey, SubscriptionInfo>());
+    }
+    
+    protected static Map<SubscriptionKey, MemoryTopicSub> makeSubMap() {
+        return Collections.synchronizedMap(new HashMap<SubscriptionKey, MemoryTopicSub>());
     }
 
     public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
         super.addMessage(context, message);
-        for (Iterator i = topicSubMap.values().iterator(); i.hasNext();) {
-            MemoryTopicSub sub = (MemoryTopicSub)i.next();
+        for (Iterator<MemoryTopicSub> i = topicSubMap.values().iterator(); i.hasNext();) {
+            MemoryTopicSub sub = i.next();
             sub.addMessage(message.getMessageId(), message);
         }
     }
 
     public synchronized void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {
         SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
-        MemoryTopicSub sub = (MemoryTopicSub)topicSubMap.get(key);
+        MemoryTopicSub sub = topicSubMap.get(key);
         if (sub != null) {
             sub.removeMessage(messageId);
         }
     }
 
     public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
-        return (SubscriptionInfo)subscriberDatabase.get(new SubscriptionKey(clientId, subscriptionName));
+        return subscriberDatabase.get(new SubscriptionKey(clientId, subscriptionName));
     }
 
     public synchronized void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException {
@@ -94,7 +98,7 @@
     }
 
     public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception {
-        MemoryTopicSub sub = (MemoryTopicSub)topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
+        MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
         if (sub != null) {
             sub.recoverSubscription(listener);
         }
@@ -107,12 +111,12 @@
     }
 
     public SubscriptionInfo[] getAllSubscriptions() throws IOException {
-        return (SubscriptionInfo[])subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]);
+        return subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]);
     }
 
     public synchronized int getMessageCount(String clientId, String subscriberName) throws IOException {
         int result = 0;
-        MemoryTopicSub sub = (MemoryTopicSub)topicSubMap.get(new SubscriptionKey(clientId, subscriberName));
+        MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriberName));
         if (sub != null) {
             result = sub.size();
         }
@@ -120,14 +124,14 @@
     }
 
     public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception {
-        MemoryTopicSub sub = (MemoryTopicSub)topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
+        MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
         if (sub != null) {
             sub.recoverNextMessages(maxReturned, listener);
         }
     }
 
     public void resetBatching(String clientId, String subscriptionName) {
-        MemoryTopicSub sub = (MemoryTopicSub)topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
+        MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
         if (sub != null) {
             sub.resetBatching();
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java?view=diff&rev=564814&r1=564813&r2=564814
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java Fri Aug 10 17:49:19 2007
@@ -31,7 +31,7 @@
  */
 class MemoryTopicSub {
 
-    private Map map = new LinkedHashMap();
+    private Map<MessageId, Message> map = new LinkedHashMap<MessageId, Message>();
     private MessageId lastBatch;
 
     void addMessage(MessageId id, Message message) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java?view=diff&rev=564814&r1=564813&r2=564814
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java Fri Aug 10 17:49:19 2007
@@ -43,16 +43,16 @@
  */
 public class MemoryTransactionStore implements TransactionStore {
 
-    ConcurrentHashMap inflightTransactions = new ConcurrentHashMap();
+    ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
 
-    ConcurrentHashMap preparedTransactions = new ConcurrentHashMap();
+    ConcurrentHashMap<TransactionId, Tx> preparedTransactions = new ConcurrentHashMap<TransactionId, Tx>();
 
     private boolean doingRecover;
 
     public static class Tx {
-        private ArrayList messages = new ArrayList();
+        private ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
 
-        private ArrayList acks = new ArrayList();
+        private ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
 
         public void add(AddMessageCommand msg) {
             messages.add(msg);
@@ -65,8 +65,8 @@
         public Message[] getMessages() {
             Message rc[] = new Message[messages.size()];
             int count = 0;
-            for (Iterator iter = messages.iterator(); iter.hasNext();) {
-                AddMessageCommand cmd = (AddMessageCommand)iter.next();
+            for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
+                AddMessageCommand cmd = iter.next();
                 rc[count++] = cmd.getMessage();
             }
             return rc;
@@ -75,8 +75,8 @@
         public MessageAck[] getAcks() {
             MessageAck rc[] = new MessageAck[acks.size()];
             int count = 0;
-            for (Iterator iter = acks.iterator(); iter.hasNext();) {
-                RemoveMessageCommand cmd = (RemoveMessageCommand)iter.next();
+            for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
+                RemoveMessageCommand cmd = iter.next();
                 rc[count++] = cmd.getMessageAck();
             }
             return rc;
@@ -87,13 +87,13 @@
          */
         public void commit() throws IOException {
             // Do all the message adds.
-            for (Iterator iter = messages.iterator(); iter.hasNext();) {
-                AddMessageCommand cmd = (AddMessageCommand)iter.next();
+            for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
+                AddMessageCommand cmd = iter.next();
                 cmd.run();
             }
             // And removes..
-            for (Iterator iter = acks.iterator(); iter.hasNext();) {
-                RemoveMessageCommand cmd = (RemoveMessageCommand)iter.next();
+            for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
+                RemoveMessageCommand cmd = iter.next();
                 cmd.run();
             }
         }
@@ -139,7 +139,7 @@
      * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
      */
     public void prepare(TransactionId txid) {
-        Tx tx = (Tx)inflightTransactions.remove(txid);
+        Tx tx = inflightTransactions.remove(txid);
         if (tx == null) {
             return;
         }
@@ -147,7 +147,7 @@
     }
 
     public Tx getTx(Object txid) {
-        Tx tx = (Tx)inflightTransactions.get(txid);
+        Tx tx = inflightTransactions.get(txid);
         if (tx == null) {
             tx = new Tx();
             inflightTransactions.put(txid, tx);
@@ -163,9 +163,9 @@
 
         Tx tx;
         if (wasPrepared) {
-            tx = (Tx)preparedTransactions.remove(txid);
+            tx = preparedTransactions.remove(txid);
         } else {
-            tx = (Tx)inflightTransactions.remove(txid);
+            tx = inflightTransactions.remove(txid);
         }
 
         if (tx == null) {
@@ -194,9 +194,9 @@
         inflightTransactions.clear();
         this.doingRecover = true;
         try {
-            for (Iterator iter = preparedTransactions.keySet().iterator(); iter.hasNext();) {
-                Object txid = (Object)iter.next();
-                Tx tx = (Tx)preparedTransactions.get(txid);
+            for (Iterator<TransactionId> iter = preparedTransactions.keySet().iterator(); iter.hasNext();) {
+                Object txid = iter.next();
+                Tx tx = preparedTransactions.get(txid);
                 listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks());
             }
         } finally {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java?view=diff&rev=564814&r1=564813&r2=564814
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java Fri Aug 10 17:49:19 2007
@@ -24,7 +24,7 @@
  * 
  * @version $Revision$
  */
-public class DefaultThreadPools {
+public final class DefaultThreadPools {
 
     private static final Executor DEFAULT_POOL;
     static {
@@ -35,9 +35,11 @@
                 return thread;
             }
         });
-    }
-    
+    }    
     private static final TaskRunnerFactory DEFAULT_TASK_RUNNER_FACTORY = new TaskRunnerFactory();
+    
+    private DefaultThreadPools() {        
+    }
     
     public static Executor getDefaultPool() {
         return DEFAULT_POOL;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java?view=diff&rev=564814&r1=564813&r2=564814
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java Fri Aug 10 17:49:19 2007
@@ -26,27 +26,35 @@
 /**
  * @version $Revision$
  */
-public class Scheduler {
+public final class Scheduler {
 
-    public static final ScheduledThreadPoolExecutor CLOCK_DAEMON = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
-        public Thread newThread(Runnable runnable) {
-            Thread thread = new Thread(runnable, "ActiveMQ Scheduler");
-            thread.setDaemon(true);
-            return thread;
-        }
-    });
+    public static final ScheduledThreadPoolExecutor CLOCK_DAEMON = new ScheduledThreadPoolExecutor(5, createThreadFactory());
     static {
         CLOCK_DAEMON.setKeepAliveTime(5, TimeUnit.SECONDS);
     }
-    private static final HashMap CLOCK_TICKETS = new HashMap();
+    private static final HashMap<Runnable, ScheduledFuture> CLOCK_TICKETS = new HashMap<Runnable, ScheduledFuture>();
+
+    private Scheduler() {
+    }
+
+    private static ThreadFactory createThreadFactory() {
+        return new ThreadFactory() {
+            public Thread newThread(Runnable runnable) {
+                Thread thread = new Thread(runnable, "ActiveMQ Scheduler");
+                thread.setDaemon(true);
+                return thread;
+            }
+        };
+    }
 
     public static synchronized void executePeriodically(final Runnable task, long period) {
-        ScheduledFuture ticket = CLOCK_DAEMON.scheduleAtFixedRate(task, period, period, TimeUnit.MILLISECONDS);
+        ScheduledFuture ticket = CLOCK_DAEMON
+            .scheduleAtFixedRate(task, period, period, TimeUnit.MILLISECONDS);
         CLOCK_TICKETS.put(task, ticket);
     }
 
     public static synchronized void cancel(Runnable task) {
-        ScheduledFuture ticket = (ScheduledFuture)CLOCK_TICKETS.remove(task);
+        ScheduledFuture ticket = CLOCK_TICKETS.remove(task);
         if (ticket != null) {
             ticket.cancel(false);
             if (ticket instanceof Runnable) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java?view=diff&rev=564814&r1=564813&r2=564814
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java Fri Aug 10 17:49:19 2007
@@ -76,7 +76,7 @@
     }
 
     protected ExecutorService createDefaultExecutor() {
-        ThreadPoolExecutor rc = new ThreadPoolExecutor(1, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactory() {
+        ThreadPoolExecutor rc = new ThreadPoolExecutor(1, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
             public Thread newThread(Runnable runnable) {
                 Thread thread = new Thread(runnable, name);
                 thread.setDaemon(daemon);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Valve.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Valve.java?view=diff&rev=564814&r1=564813&r2=564814
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Valve.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Valve.java Fri Aug 10 17:49:19 2007
@@ -26,8 +26,8 @@
 
     private final Object mutex = new Object();
     private boolean on;
-    private int turningOff = 0;
-    private int usage = 0;
+    private int turningOff;
+    private int usage;
 
     public Valve(boolean on) {
         this.on = on;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java?view=diff&rev=564814&r1=564813&r2=564814
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java Fri Aug 10 17:49:19 2007
@@ -37,7 +37,7 @@
     public static final byte PREPARED_STATE = 2; // can go to: 3
     public static final byte FINISHED_STATE = 3;
 
-    private ArrayList synchronizations = new ArrayList();
+    private ArrayList<Synchronization> synchronizations = new ArrayList<Synchronization>();
     private byte state = START_STATE;
 
     public byte getState() {
@@ -81,15 +81,15 @@
     }
 
     protected void fireAfterCommit() throws Exception {
-        for (Iterator iter = synchronizations.iterator(); iter.hasNext();) {
-            Synchronization s = (Synchronization)iter.next();
+        for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) {
+            Synchronization s = iter.next();
             s.afterCommit();
         }
     }
 
     public void fireAfterRollback() throws Exception {
-        for (Iterator iter = synchronizations.iterator(); iter.hasNext();) {
-            Synchronization s = (Synchronization)iter.next();
+        for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) {
+            Synchronization s = iter.next();
             s.afterRollback();
         }
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java?view=diff&rev=564814&r1=564813&r2=564814
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java Fri Aug 10 17:49:19 2007
@@ -29,7 +29,7 @@
     private static final Log LOG = LogFactory.getLog(FutureResponse.class);
 
     private final ResponseCallback responseCallback;
-    private final ArrayBlockingQueue responseSlot = new ArrayBlockingQueue(1);
+    private final ArrayBlockingQueue<Response> responseSlot = new ArrayBlockingQueue<Response>(1);
 
     public FutureResponse(ResponseCallback responseCallback) {
         this.responseCallback = responseCallback;
@@ -37,7 +37,7 @@
 
     public Response getResult() throws IOException {
         try {
-            return (Response)responseSlot.take();
+            return responseSlot.take();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             if (LOG.isDebugEnabled()) {
@@ -49,7 +49,7 @@
 
     public Response getResult(int timeout) throws IOException {
         try {
-            return (Response)responseSlot.poll(timeout, TimeUnit.MILLISECONDS);
+            return responseSlot.poll(timeout, TimeUnit.MILLISECONDS);
         } catch (InterruptedException e) {
             throw new InterruptedIOException("Interrupted.");
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java?view=diff&rev=564814&r1=564813&r2=564814
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java Fri Aug 10 17:49:19 2007
@@ -39,7 +39,7 @@
 public class ResponseCorrelator extends TransportFilter {
 
     private static final Log LOG = LogFactory.getLog(ResponseCorrelator.class);
-    private final Map requestMap = new HashMap();
+    private final Map<Integer, FutureResponse> requestMap = new HashMap<Integer, FutureResponse>();
     private IntSequenceGenerator sequenceGenerator;
     private final boolean debug = LOG.isDebugEnabled();
 
@@ -87,7 +87,7 @@
             Response response = (Response)command;
             FutureResponse future = null;
             synchronized (requestMap) {
-                future = (FutureResponse)requestMap.remove(Integer.valueOf(response.getCorrelationId()));
+                future = requestMap.remove(Integer.valueOf(response.getCorrelationId()));
             }
             if (future != null) {
                 future.set(response);
@@ -107,10 +107,10 @@
      */
     public void onException(IOException error) {
         // Copy and Clear the request Map
-        ArrayList requests = new ArrayList(requestMap.values());
+        ArrayList<FutureResponse> requests = new ArrayList<FutureResponse>(requestMap.values());
         requestMap.clear();
-        for (Iterator iter = requests.iterator(); iter.hasNext();) {
-            FutureResponse fr = (FutureResponse)iter.next();
+        for (Iterator<FutureResponse> iter = requests.iterator(); iter.hasNext();) {
+            FutureResponse fr = iter.next();
             fr.set(new ExceptionResponse(error));
         }
         super.onException(error);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java?view=diff&rev=564814&r1=564813&r2=564814
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java Fri Aug 10 17:49:19 2007
@@ -122,7 +122,7 @@
      * @param target
      * @return the target
      */
-    Object narrow(Class target);
+    <T> T narrow(Class<T> target);
 
     /**
      * @return the remote address for this connection

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java?view=diff&rev=564814&r1=564813&r2=564814
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java Fri Aug 10 17:49:19 2007
@@ -37,7 +37,7 @@
 
     private static final FactoryFinder TRANSPORT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/");
     private static final FactoryFinder WIREFORMAT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/wireformat/");
-    private static final ConcurrentHashMap TRANSPORT_FACTORYS = new ConcurrentHashMap();
+    private static final ConcurrentHashMap<String, TransportFactory> TRANSPORT_FACTORYS = new ConcurrentHashMap<String, TransportFactory>();
 
     public abstract TransportServer doBind(String brokerId, URI location) throws IOException;
 
@@ -108,7 +108,7 @@
 
     public Transport doConnect(URI location) throws Exception {
         try {
-            Map options = new HashMap(URISupport.parseParamters(location));
+            Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(location));
             WireFormat wf = createWireFormat(options);
             Transport transport = createTransport(location, wf);
             Transport rc = configure(transport, wf, options);
@@ -123,7 +123,7 @@
 
     public Transport doCompositeConnect(URI location) throws Exception {
         try {
-            Map options = new HashMap(URISupport.parseParamters(location));
+            Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(location));
             WireFormat wf = createWireFormat(options);
             Transport transport = createTransport(location, wf);
             Transport rc = compositeConfigure(transport, wf, options);
@@ -157,7 +157,7 @@
         if (scheme == null) {
             throw new IOException("Transport not scheme specified: [" + location + "]");
         }
-        TransportFactory tf = (TransportFactory)TRANSPORT_FACTORYS.get(scheme);
+        TransportFactory tf = TRANSPORT_FACTORYS.get(scheme);
         if (tf == null) {
             // Try to load if from a META-INF property.
             try {
@@ -170,13 +170,13 @@
         return tf;
     }
 
-    protected WireFormat createWireFormat(Map options) throws IOException {
+    protected WireFormat createWireFormat(Map<String, String> options) throws IOException {
         WireFormatFactory factory = createWireFormatFactory(options);
         WireFormat format = factory.createWireFormat();
         return format;
     }
 
-    protected WireFormatFactory createWireFormatFactory(Map options) throws IOException {
+    protected WireFormatFactory createWireFormatFactory(Map<String, String> options) throws IOException {
         String wireFormat = (String)options.get("wireFormat");
         if (wireFormat == null) {
             wireFormat = getDefaultWireFormatType();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java?view=diff&rev=564814&r1=564813&r2=564814
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java Fri Aug 10 17:49:19 2007
@@ -106,9 +106,9 @@
         transportListener.transportResumed();
     }
 
-    public Object narrow(Class target) {
+    public <T> T narrow(Class<T> target) {
         if (target.isAssignableFrom(getClass())) {
-            return this;
+            return target.cast(this);
         }
         return next.narrow(target);
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportLogger.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportLogger.java?view=diff&rev=564814&r1=564813&r2=564814
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportLogger.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportLogger.java Fri Aug 10 17:49:19 2007
@@ -33,13 +33,13 @@
         this(next, LogFactory.getLog(TransportLogger.class.getName() + ".Connection:" + getNextId()));
     }
 
-    private static synchronized int getNextId() {
-        return ++lastId;
-    }
-
     public TransportLogger(Transport next, Log log) {
         super(next);
         this.log = log;
+    }
+
+    private static synchronized int getNextId() {
+        return ++lastId;
     }
 
     public Object request(Object command) throws IOException {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java?view=diff&rev=564814&r1=564813&r2=564814
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java Fri Aug 10 17:49:19 2007
@@ -35,8 +35,8 @@
     private boolean daemon = true;
     private boolean joinOnStop = true;
     private Thread runner;
- // should be a multiple of 128k
-    private long stackSize = 0;
+    // should be a multiple of 128k
+    private long stackSize;
 
     public TransportServerThreadSupport() {
     }



Mime
View raw message