activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1492214 - in /activemq/trunk: activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCTablePrefixAssignedTest.java
Date Wed, 12 Jun 2013 14:18:51 GMT
Author: tabish
Date: Wed Jun 12 14:18:51 2013
New Revision: 1492214

URL: http://svn.apache.org/r1492214
Log:
fix and test for: https://issues.apache.org/jira/browse/AMQ-4581

Added:
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCTablePrefixAssignedTest.java
  (with props)
Modified:
    activemq/trunk/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java

Modified: activemq/trunk/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=1492214&r1=1492213&r2=1492214&view=diff
==============================================================================
--- activemq/trunk/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
(original)
+++ activemq/trunk/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
Wed Jun 12 14:18:51 2013
@@ -32,8 +32,8 @@ import javax.sql.DataSource;
 
 import org.apache.activemq.ActiveMQMessageAudit;
 import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.Locker;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -42,7 +42,6 @@ import org.apache.activemq.command.Messa
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.activemq.broker.Locker;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.TopicMessageStore;
@@ -62,22 +61,21 @@ import org.slf4j.LoggerFactory;
 /**
  * A {@link PersistenceAdapter} implementation using JDBC for persistence
  * storage.
- * 
+ *
  * This persistence adapter will correctly remember prepared XA transactions,
  * but it will not keep track of local transaction commits so that operations
  * performed against the Message store are done as a single uow.
- * 
+ *
  * @org.apache.xbean.XBean element="jdbcPersistenceAdapter"
- * 
- * 
+ *
  */
 public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements PersistenceAdapter
{
 
     private static final Logger LOG = LoggerFactory.getLogger(JDBCPersistenceAdapter.class);
     private static FactoryFinder adapterFactoryFinder = new FactoryFinder(
-                                                                   "META-INF/services/org/apache/activemq/store/jdbc/");
+        "META-INF/services/org/apache/activemq/store/jdbc/");
     private static FactoryFinder lockFactoryFinder = new FactoryFinder(
-                                                                    "META-INF/services/org/apache/activemq/store/jdbc/lock/");
+        "META-INF/services/org/apache/activemq/store/jdbc/lock/");
 
     public static final long DEFAULT_LOCK_KEEP_ALIVE_PERIOD = 30 * 1000;
 
@@ -94,13 +92,13 @@ public class JDBCPersistenceAdapter exte
     private int transactionIsolation;
     private File directory;
     private boolean changeAutoCommitAllowed = true;
-    
+
     protected int maxProducersToAudit=1024;
     protected int maxAuditDepth=1000;
     protected boolean enableAudit=false;
     protected int auditRecoveryDepth = 1024;
     protected ActiveMQMessageAudit audit;
-    
+
     protected LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
     protected int maxRows = DefaultJDBCAdapter.MAX_ROWS;
 
@@ -116,6 +114,7 @@ public class JDBCPersistenceAdapter exte
         this.wireFormat = wireFormat;
     }
 
+    @Override
     public Set<ActiveMQDestination> getDestinations() {
         TransactionContext c = null;
         try {
@@ -140,15 +139,16 @@ public class JDBCPersistenceAdapter exte
     private Set<ActiveMQDestination> emptyDestinationSet() {
         return Collections.EMPTY_SET;
     }
-    
+
     protected void createMessageAudit() {
         if (enableAudit && audit == null) {
             audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
             TransactionContext c = null;
-            
+
             try {
                 c = getTransactionContext();
                 getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener()
{
+                    @Override
                     public void messageId(MessageId id) {
                         audit.isDuplicate(id);
                     }
@@ -163,14 +163,15 @@ public class JDBCPersistenceAdapter exte
                     }
                 }
             }
-    	}
+        }
     }
-    
+
     public void initSequenceIdGenerator() {
         TransactionContext c = null;
         try {
             c = getTransactionContext();
             getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener()
{
+                @Override
                 public void messageId(MessageId id) {
                     audit.isDuplicate(id);
                 }
@@ -185,9 +186,9 @@ public class JDBCPersistenceAdapter exte
                 }
             }
         }
-        
     }
 
+    @Override
     public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException
{
         MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination,
audit);
         if (transactionStore != null) {
@@ -196,6 +197,7 @@ public class JDBCPersistenceAdapter exte
         return rc;
     }
 
+    @Override
     public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException
{
         TopicMessageStore rc = new JDBCTopicMessageStore(this, getAdapter(), wireFormat,
destination, audit);
         if (transactionStore != null) {
@@ -208,6 +210,7 @@ public class JDBCPersistenceAdapter exte
      * Cleanup method to remove any state associated with the given destination
      * @param destination Destination to forget
      */
+    @Override
     public void removeQueueMessageStore(ActiveMQQueue destination) {
         if (destination.isQueue() && getBrokerService().shouldRecordVirtualDestination(destination))
{
             try {
@@ -237,9 +240,11 @@ public class JDBCPersistenceAdapter exte
      *
      * @param destination Destination to forget
      */
+    @Override
     public void removeTopicMessageStore(ActiveMQTopic destination) {
     }
 
+    @Override
     public TransactionStore createTransactionStore() throws IOException {
         if (transactionStore == null) {
             transactionStore = new JdbcMemoryTransactionStore(this);
@@ -247,6 +252,7 @@ public class JDBCPersistenceAdapter exte
         return this.transactionStore;
     }
 
+    @Override
     public long getLastMessageBrokerSequenceId() throws IOException {
         TransactionContext c = getTransactionContext();
         try {
@@ -270,7 +276,8 @@ public class JDBCPersistenceAdapter exte
             c.close();
         }
     }
-    
+
+    @Override
     public long getLastProducerSequenceId(ProducerId id) throws IOException {
         TransactionContext c = getTransactionContext();
         try {
@@ -303,6 +310,7 @@ public class JDBCPersistenceAdapter exte
         }
     }
 
+    @Override
     public void doStart() throws Exception {
 
         if( brokerService!=null ) {
@@ -312,6 +320,7 @@ public class JDBCPersistenceAdapter exte
         // Cleanup the db periodically.
         if (cleanupPeriod > 0) {
             cleanupTicket = getScheduledThreadPoolExecutor().scheduleWithFixedDelay(new Runnable()
{
+                @Override
                 public void run() {
                     cleanup();
                 }
@@ -320,6 +329,7 @@ public class JDBCPersistenceAdapter exte
         createMessageAudit();
     }
 
+    @Override
     public synchronized void doStop(ServiceStopper stopper) throws Exception {
         if (cleanupTicket != null) {
             cleanupTicket.cancel(true);
@@ -353,9 +363,11 @@ public class JDBCPersistenceAdapter exte
         this.clockDaemon = clockDaemon;
     }
 
+    @Override
     public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() {
         if (clockDaemon == null) {
             clockDaemon = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
+                @Override
                 public Thread newThread(Runnable runnable) {
                     Thread thread = new Thread(runnable, "ActiveMQ Cleanup Timer");
                     thread.setDaemon(true);
@@ -374,7 +386,6 @@ public class JDBCPersistenceAdapter exte
     }
 
     /**
-     *
      * @deprecated as of 5.7.0, replaced by {@link #getLocker()}
      */
     @Deprecated
@@ -388,6 +399,7 @@ public class JDBCPersistenceAdapter exte
      *
      * @deprecated as of 5.7.0, replaced by {@link #setLocker(org.apache.activemq.broker.Locker)}
      */
+    @Deprecated
     public void setDatabaseLocker(Locker locker) throws IOException {
         setLocker(locker);
     }
@@ -405,7 +417,7 @@ public class JDBCPersistenceAdapter exte
         }
         return lockDataSource;
     }
-    
+
     public void setLockDataSource(DataSource dataSource) {
         this.lockDataSource = dataSource;
     }
@@ -418,9 +430,9 @@ public class JDBCPersistenceAdapter exte
      * @throws IOException
      */
     protected JDBCAdapter createAdapter() throws IOException {
-       
+
         adapter = (JDBCAdapter) loadAdapter(adapterFactoryFinder, "adapter");
-       
+
         // Use the default JDBC adapter if the
         // Database type is not recognized.
         if (adapter == null) {
@@ -492,16 +504,19 @@ public class JDBCPersistenceAdapter exte
         return answer;
     }
 
+    @Override
     public void beginTransaction(ConnectionContext context) throws IOException {
         TransactionContext transactionContext = getTransactionContext(context);
         transactionContext.begin();
     }
 
+    @Override
     public void commitTransaction(ConnectionContext context) throws IOException {
         TransactionContext transactionContext = getTransactionContext(context);
         transactionContext.commit();
     }
 
+    @Override
     public void rollbackTransaction(ConnectionContext context) throws IOException {
         TransactionContext transactionContext = getTransactionContext(context);
         transactionContext.rollback();
@@ -533,6 +548,7 @@ public class JDBCPersistenceAdapter exte
         this.changeAutoCommitAllowed = changeAutoCommitAllowed;
     }
 
+    @Override
     public void deleteAllMessages() throws IOException {
         TransactionContext c = getTransactionContext();
         try {
@@ -596,15 +612,20 @@ public class JDBCPersistenceAdapter exte
 
     public void setStatements(Statements statements) {
         this.statements = statements;
+        if (adapter != null) {
+            this.adapter.setStatements(getStatements());
+        }
     }
 
     /**
      * @param usageManager The UsageManager that is controlling the
      *                destination's memory usage.
      */
+    @Override
     public void setUsageManager(SystemUsage usageManager) {
     }
 
+    @Override
     public Locker createDefaultLocker() throws IOException {
         Locker locker = (Locker) loadAdapter(lockFactoryFinder, "lock");
         if (locker == null) {
@@ -615,17 +636,21 @@ public class JDBCPersistenceAdapter exte
         return locker;
     }
 
+    @Override
     public void setBrokerName(String brokerName) {
     }
 
+    @Override
     public String toString() {
         return "JDBCPersistenceAdapter(" + super.toString() + ")";
     }
 
+    @Override
     public void setDirectory(File dir) {
         this.directory=dir;
     }
-    
+
+    @Override
     public File getDirectory(){
         if (this.directory==null && brokerService != null){
             this.directory=brokerService.getBrokerDataDirectory();
@@ -634,6 +659,7 @@ public class JDBCPersistenceAdapter exte
     }
 
     // interesting bit here is proof that DB is ok
+    @Override
     public void checkpoint(boolean sync) throws IOException {
         // by pass TransactionContext to avoid IO Exception handler
         Connection connection = null;
@@ -652,6 +678,7 @@ public class JDBCPersistenceAdapter exte
         }
     }
 
+    @Override
     public long size(){
         return 0;
     }
@@ -663,10 +690,11 @@ public class JDBCPersistenceAdapter exte
      * not applied if DataBaseLocker is injected.
      *
      */
+    @Deprecated
     public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) throws IOException
{
         getLocker().setLockAcquireSleepInterval(lockAcquireSleepInterval);
     }
-    
+
     /**
      * set the Transaction isolation level to something other that TRANSACTION_READ_UNCOMMITTED
      * This allowable dirty isolation level may not be achievable in clustered DB environments
@@ -678,29 +706,29 @@ public class JDBCPersistenceAdapter exte
         this.transactionIsolation = transactionIsolation;
     }
 
-	public int getMaxProducersToAudit() {
-		return maxProducersToAudit;
-	}
-
-	public void setMaxProducersToAudit(int maxProducersToAudit) {
-		this.maxProducersToAudit = maxProducersToAudit;
-	}
-
-	public int getMaxAuditDepth() {
-		return maxAuditDepth;
-	}
-
-	public void setMaxAuditDepth(int maxAuditDepth) {
-		this.maxAuditDepth = maxAuditDepth;
-	}
-
-	public boolean isEnableAudit() {
-		return enableAudit;
-	}
-
-	public void setEnableAudit(boolean enableAudit) {
-		this.enableAudit = enableAudit;
-	}
+    public int getMaxProducersToAudit() {
+        return maxProducersToAudit;
+    }
+
+    public void setMaxProducersToAudit(int maxProducersToAudit) {
+        this.maxProducersToAudit = maxProducersToAudit;
+    }
+
+    public int getMaxAuditDepth() {
+        return maxAuditDepth;
+    }
+
+    public void setMaxAuditDepth(int maxAuditDepth) {
+        this.maxAuditDepth = maxAuditDepth;
+    }
+
+    public boolean isEnableAudit() {
+        return enableAudit;
+    }
+
+    public void setEnableAudit(boolean enableAudit) {
+        this.enableAudit = enableAudit;
+    }
 
     public int getAuditRecoveryDepth() {
         return auditRecoveryDepth;
@@ -764,7 +792,6 @@ public class JDBCPersistenceAdapter exte
         }
     }
 
-
     public void commitLastAck(ConnectionContext context, long xidLastAck, long priority,
ActiveMQDestination destination, String subName, String clientId) throws IOException {
         TransactionContext c = getTransactionContext(context);
         try {
@@ -816,5 +843,4 @@ public class JDBCPersistenceAdapter exte
         }
         return result;
     }
-
 }

Added: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCTablePrefixAssignedTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCTablePrefixAssignedTest.java?rev=1492214&view=auto
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCTablePrefixAssignedTest.java
(added)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCTablePrefixAssignedTest.java
Wed Jun 12 14:18:51 2013
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.jdbc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.Destination;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.derby.jdbc.EmbeddedDataSource;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JDBCTablePrefixAssignedTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JDBCTablePrefixAssignedTest.class);
+
+    private BrokerService service;
+
+    @Before
+    public void setUp() throws Exception {
+        service = createBroker();
+        service.start();
+        service.waitUntilStarted();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        service.stop();
+        service.waitUntilStopped();
+    }
+
+    @Test
+    public void testTablesHave() throws Exception {
+
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?create=false");
+        ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createQueue("TEST.FOO");
+        MessageProducer producer = session.createProducer(destination);
+
+        for (int i = 0; i < 10; ++i) {
+            producer.send(session.createTextMessage("test"));
+        }
+        producer.close();
+        connection.close();
+
+        List<Message> queuedMessages = null;
+        try {
+            queuedMessages = dumpMessages();
+        } catch (Exception ex) {
+            LOG.info("Caught ex: ", ex);
+            fail("Should not have thrown an exception");
+        }
+
+        assertNotNull(queuedMessages);
+        assertEquals("Should have found 10 messages", 10, queuedMessages.size());
+    }
+
+    protected List<Message> dumpMessages() throws Exception {
+        WireFormat wireFormat = new OpenWireFormat();
+        java.sql.Connection conn = ((JDBCPersistenceAdapter) service.getPersistenceAdapter()).getDataSource().getConnection();
+        PreparedStatement statement = conn.prepareStatement("SELECT ID, MSG FROM MYPREFIX_ACTIVEMQ_MSGS");
+        ResultSet result = statement.executeQuery();
+        ArrayList<Message> results = new ArrayList<Message>();
+        while(result.next()) {
+            long id = result.getLong(1);
+            Message message = (Message)wireFormat.unmarshal(new ByteSequence(result.getBytes(2)));
+            LOG.info("id: " + id + ", message SeqId: " + message.getMessageId().getBrokerSequenceId()
+ ", MSG: " + message);
+            results.add(message);
+        }
+        statement.close();
+        conn.close();
+
+        return results;
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
+        EmbeddedDataSource dataSource = new EmbeddedDataSource();
+        dataSource.setDatabaseName("derbyDb");
+        dataSource.setCreateDatabase("create");
+
+        DefaultJDBCAdapter adapter = new DefaultJDBCAdapter();
+        jdbc.setAdapter(adapter);
+
+        Statements statements = new Statements();
+        statements.setTablePrefix("MYPREFIX_");
+        jdbc.setStatements(statements);
+
+        jdbc.setUseLock(false);
+        jdbc.setDataSource(dataSource);
+        jdbc.deleteAllMessages();
+        broker.setPersistenceAdapter(jdbc);
+        return broker;
+    }
+}

Propchange: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCTablePrefixAssignedTest.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message