From commits-return-6827-apmail-activemq-commits-archive=activemq.apache.org@activemq.apache.org Wed Aug 08 19:00:46 2007 Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 50003 invoked from network); 8 Aug 2007 19:00:37 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 8 Aug 2007 19:00:37 -0000 Received: (qmail 68476 invoked by uid 500); 8 Aug 2007 19:00:35 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 68446 invoked by uid 500); 8 Aug 2007 19:00:34 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 68340 invoked by uid 99); 8 Aug 2007 19:00:34 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Aug 2007 12:00:34 -0700 X-ASF-Spam-Status: No, hits=-100.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Aug 2007 19:00:26 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 64E491A987C; Wed, 8 Aug 2007 11:59:20 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r563982 [18/32] - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/advisory/ main/java/org/apache/activemq/blob/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/jm... Date: Wed, 08 Aug 2007 18:58:13 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070808185920.64E491A987C@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?view=diff&rev=563982&r1=563981&r2=563982 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java Wed Aug 8 11:56:59 2007 @@ -97,16 +97,17 @@ public String[] getDropSchemaStatements() { if (dropSchemaStatements == null) { - dropSchemaStatements = new String[] { "DROP TABLE " + getFullAckTableName() + "", - "DROP TABLE " + getFullMessageTableName() + "", }; + dropSchemaStatements = new String[] {"DROP TABLE " + getFullAckTableName() + "", + "DROP TABLE " + getFullMessageTableName() + "",}; } return dropSchemaStatements; } public String getAddMessageStatement() { if (addMessageStatement == null) { - addMessageStatement = "INSERT INTO " + getFullMessageTableName() - + "(ID, MSGID_PROD, MSGID_SEQ, CONTAINER, EXPIRATION, MSG) VALUES (?, ?, ?, ?, ?, ?)"; + addMessageStatement = "INSERT INTO " + + getFullMessageTableName() + + "(ID, MSGID_PROD, MSGID_SEQ, CONTAINER, EXPIRATION, MSG) VALUES (?, ?, ?, ?, ?, ?)"; } return addMessageStatement; } @@ -128,7 +129,7 @@ public String getFindMessageSequenceIdStatement() { if (findMessageSequenceIdStatement == null) { findMessageSequenceIdStatement = "SELECT ID FROM " + getFullMessageTableName() - + " WHERE MSGID_PROD=? AND MSGID_SEQ=?"; + + " WHERE MSGID_PROD=? AND MSGID_SEQ=?"; } return findMessageSequenceIdStatement; } @@ -143,7 +144,7 @@ public String getFindAllMessagesStatement() { if (findAllMessagesStatement == null) { findAllMessagesStatement = "SELECT ID, MSG FROM " + getFullMessageTableName() - + " WHERE CONTAINER=? ORDER BY ID"; + + " WHERE CONTAINER=? ORDER BY ID"; } return findAllMessagesStatement; } @@ -164,8 +165,10 @@ public String getCreateDurableSubStatement() { if (createDurableSubStatement == null) { - createDurableSubStatement = "INSERT INTO " + getFullAckTableName() - + "(CONTAINER, CLIENT_ID, SUB_NAME, SELECTOR, LAST_ACKED_ID, SUB_DEST) " + "VALUES (?, ?, ?, ?, ?, ?)"; + createDurableSubStatement = "INSERT INTO " + + getFullAckTableName() + + "(CONTAINER, CLIENT_ID, SUB_NAME, SELECTOR, LAST_ACKED_ID, SUB_DEST) " + + "VALUES (?, ?, ?, ?, ?, ?)"; } return createDurableSubStatement; } @@ -173,15 +176,15 @@ public String getFindDurableSubStatement() { if (findDurableSubStatement == null) { findDurableSubStatement = "SELECT SELECTOR, SUB_DEST " + "FROM " + getFullAckTableName() - + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?"; + + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?"; } return findDurableSubStatement; } public String getFindAllDurableSubsStatement() { if (findAllDurableSubsStatement == null) { - findAllDurableSubsStatement = "SELECT SELECTOR, SUB_NAME, CLIENT_ID, SUB_DEST" + " FROM " + getFullAckTableName() - + " WHERE CONTAINER=?"; + findAllDurableSubsStatement = "SELECT SELECTOR, SUB_NAME, CLIENT_ID, SUB_DEST" + " FROM " + + getFullAckTableName() + " WHERE CONTAINER=?"; } return findAllDurableSubsStatement; } @@ -189,7 +192,7 @@ public String getUpdateLastAckOfDurableSubStatement() { if (updateLastAckOfDurableSubStatement == null) { updateLastAckOfDurableSubStatement = "UPDATE " + getFullAckTableName() + " SET LAST_ACKED_ID=?" - + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?"; + + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?"; } return updateLastAckOfDurableSubStatement; } @@ -197,62 +200,76 @@ public String getDeleteSubscriptionStatement() { if (deleteSubscriptionStatement == null) { deleteSubscriptionStatement = "DELETE FROM " + getFullAckTableName() - + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?"; + + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?"; } return deleteSubscriptionStatement; } public String getFindAllDurableSubMessagesStatement() { if (findAllDurableSubMessagesStatement == null) { - findAllDurableSubMessagesStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, " - + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" - + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" + " ORDER BY M.ID"; + findAllDurableSubMessagesStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + + " M, " + getFullAckTableName() + " D " + + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" + + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" + + " ORDER BY M.ID"; } return findAllDurableSubMessagesStatement; } - - public String getFindDurableSubMessagesStatement(){ - if(findDurableSubMessagesStatement==null){ - findDurableSubMessagesStatement="SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, " - + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" - + " AND M.CONTAINER=D.CONTAINER AND M.ID > ?" + " ORDER BY M.ID"; + + public String getFindDurableSubMessagesStatement() { + if (findDurableSubMessagesStatement == null) { + findDurableSubMessagesStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, " + + getFullAckTableName() + " D " + + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" + + " AND M.CONTAINER=D.CONTAINER AND M.ID > ?" + + " ORDER BY M.ID"; } return findDurableSubMessagesStatement; } - + public String findAllDurableSubMessagesStatement() { if (findAllDurableSubMessagesStatement == null) { - findAllDurableSubMessagesStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, " - + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" - + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" + " ORDER BY M.ID"; + findAllDurableSubMessagesStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + + " M, " + getFullAckTableName() + " D " + + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" + + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" + + " ORDER BY M.ID"; } return findAllDurableSubMessagesStatement; } - - public String getNextDurableSubscriberMessageStatement(){ - if (nextDurableSubscriberMessageStatement == null){ - nextDurableSubscriberMessageStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, " - + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" - + " AND M.CONTAINER=D.CONTAINER AND M.ID > ?" + " ORDER BY M.ID "; + + public String getNextDurableSubscriberMessageStatement() { + if (nextDurableSubscriberMessageStatement == null) { + nextDurableSubscriberMessageStatement = "SELECT M.ID, M.MSG FROM " + + getFullMessageTableName() + + " M, " + + getFullAckTableName() + + " D " + + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" + + " AND M.CONTAINER=D.CONTAINER AND M.ID > ?" + + " ORDER BY M.ID "; } return nextDurableSubscriberMessageStatement; } - + /** * @return the durableSubscriberMessageCountStatement */ - - - public String getDurableSubscriberMessageCountStatement(){ - if (durableSubscriberMessageCountStatement==null){ - durableSubscriberMessageCountStatement = "SELECT COUNT(*) FROM " + getFullMessageTableName() + " M, " - + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" - + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID"; + + public String getDurableSubscriberMessageCountStatement() { + if (durableSubscriberMessageCountStatement == null) { + durableSubscriberMessageCountStatement = "SELECT COUNT(*) FROM " + + getFullMessageTableName() + + " M, " + + getFullAckTableName() + + " D " + + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" + + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID"; } return durableSubscriberMessageCountStatement; } - - public String getFindAllDestinationsStatement() { + + public String getFindAllDestinationsStatement() { if (findAllDestinationsStatement == null) { findAllDestinationsStatement = "SELECT DISTINCT CONTAINER FROM " + getFullMessageTableName(); } @@ -276,13 +293,15 @@ public String getDeleteOldMessagesStatement() { if (deleteOldMessagesStatement == null) { deleteOldMessagesStatement = "DELETE FROM " + getFullMessageTableName() - + " WHERE ( EXPIRATION<>0 AND EXPIRATION0 AND EXPIRATION ? ORDER BY ID"; + public String getFindNextMessagesStatement() { + if (findNextMessagesStatement == null) { + findNextMessagesStatement = "SELECT ID, MSG FROM " + getFullMessageTableName() + + " WHERE CONTAINER=? AND ID > ? ORDER BY ID"; } return findNextMessagesStatement; } - + /** * @return the lastAckedDurableSubscriberMessageStatement */ - public String getLastAckedDurableSubscriberMessageStatement(){ - if(lastAckedDurableSubscriberMessageStatement==null) { - lastAckedDurableSubscriberMessageStatement = "SELECT MAX(LAST_ACKED_ID) FROM " + getFullAckTableName() - + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?"; + public String getLastAckedDurableSubscriberMessageStatement() { + if (lastAckedDurableSubscriberMessageStatement == null) { + lastAckedDurableSubscriberMessageStatement = "SELECT MAX(LAST_ACKED_ID) FROM " + + getFullAckTableName() + + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?"; } return lastAckedDurableSubscriberMessageStatement; } - - public String getFullMessageTableName() { return getTablePrefix() + getMessageTableName(); } @@ -343,12 +361,11 @@ public String getFullAckTableName() { return getTablePrefix() + getDurableSubAcksTableName(); } - + public String getFullLockTableName() { return getTablePrefix() + getLockTableName(); } - /** * @return Returns the containerNameDataType. */ @@ -357,8 +374,7 @@ } /** - * @param containerNameDataType - * The containerNameDataType to set. + * @param containerNameDataType The containerNameDataType to set. */ public void setContainerNameDataType(String containerNameDataType) { this.containerNameDataType = containerNameDataType; @@ -372,8 +388,7 @@ } /** - * @param messageDataType - * The messageDataType to set. + * @param messageDataType The messageDataType to set. */ public void setBinaryDataType(String messageDataType) { this.binaryDataType = messageDataType; @@ -387,8 +402,7 @@ } /** - * @param messageTableName - * The messageTableName to set. + * @param messageTableName The messageTableName to set. */ public void setMessageTableName(String messageTableName) { this.messageTableName = messageTableName; @@ -402,8 +416,7 @@ } /** - * @param msgIdDataType - * The msgIdDataType to set. + * @param msgIdDataType The msgIdDataType to set. */ public void setMsgIdDataType(String msgIdDataType) { this.msgIdDataType = msgIdDataType; @@ -417,8 +430,7 @@ } /** - * @param sequenceDataType - * The sequenceDataType to set. + * @param sequenceDataType The sequenceDataType to set. */ public void setSequenceDataType(String sequenceDataType) { this.sequenceDataType = sequenceDataType; @@ -432,8 +444,7 @@ } /** - * @param tablePrefix - * The tablePrefix to set. + * @param tablePrefix The tablePrefix to set. */ public void setTablePrefix(String tablePrefix) { this.tablePrefix = tablePrefix; @@ -447,13 +458,12 @@ } /** - * @param durableSubAcksTableName - * The durableSubAcksTableName to set. + * @param durableSubAcksTableName The durableSubAcksTableName to set. */ public void setDurableSubAcksTableName(String durableSubAcksTableName) { this.durableSubAcksTableName = durableSubAcksTableName; } - + public String getLockTableName() { return lockTableName; } @@ -583,52 +593,47 @@ } /** - * @param findDurableSubMessagesStatement the findDurableSubMessagesStatement to set + * @param findDurableSubMessagesStatement the + * findDurableSubMessagesStatement to set */ - public void setFindDurableSubMessagesStatement(String findDurableSubMessagesStatement){ - this.findDurableSubMessagesStatement=findDurableSubMessagesStatement; + public void setFindDurableSubMessagesStatement(String findDurableSubMessagesStatement) { + this.findDurableSubMessagesStatement = findDurableSubMessagesStatement; } /** * @param nextDurableSubscriberMessageStatement the nextDurableSubscriberMessageStatement to set */ - public void setNextDurableSubscriberMessageStatement(String nextDurableSubscriberMessageStatement){ - this.nextDurableSubscriberMessageStatement=nextDurableSubscriberMessageStatement; + public void setNextDurableSubscriberMessageStatement(String nextDurableSubscriberMessageStatement) { + this.nextDurableSubscriberMessageStatement = nextDurableSubscriberMessageStatement; } - /** * @param durableSubscriberMessageCountStatement the durableSubscriberMessageCountStatement to set */ - public void setDurableSubscriberMessageCountStatement(String durableSubscriberMessageCountStatement){ - this.durableSubscriberMessageCountStatement=durableSubscriberMessageCountStatement; - } - + public void setDurableSubscriberMessageCountStatement(String durableSubscriberMessageCountStatement) { + this.durableSubscriberMessageCountStatement = durableSubscriberMessageCountStatement; + } + /** * @param findNextMessagesStatement the findNextMessagesStatement to set */ - public void setFindNextMessagesStatement(String findNextMessagesStatement){ - this.findNextMessagesStatement=findNextMessagesStatement; + public void setFindNextMessagesStatement(String findNextMessagesStatement) { + this.findNextMessagesStatement = findNextMessagesStatement; } /** * @param destinationMessageCountStatement the destinationMessageCountStatement to set */ - public void setDestinationMessageCountStatement(String destinationMessageCountStatement){ - this.destinationMessageCountStatement=destinationMessageCountStatement; + public void setDestinationMessageCountStatement(String destinationMessageCountStatement) { + this.destinationMessageCountStatement = destinationMessageCountStatement; } - - - /** * @param lastAckedDurableSubscriberMessageStatement the lastAckedDurableSubscriberMessageStatement to set */ - public void setLastAckedDurableSubscriberMessageStatement(String lastAckedDurableSubscriberMessageStatement){ - this.lastAckedDurableSubscriberMessageStatement=lastAckedDurableSubscriberMessageStatement; + public void setLastAckedDurableSubscriberMessageStatement( + String lastAckedDurableSubscriberMessageStatement) { + this.lastAckedDurableSubscriberMessageStatement = lastAckedDurableSubscriberMessageStatement; } - - - - + } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java?view=diff&rev=563982&r1=563981&r2=563982 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java Wed Aug 8 11:56:59 2007 @@ -36,7 +36,7 @@ public class TransactionContext { private static final Log log = LogFactory.getLog(TransactionContext.class); - + private final DataSource dataSource; private Connection connection; private boolean inTx; @@ -49,7 +49,7 @@ } public Connection getConnection() throws IOException { - if( connection == null ) { + if (connection == null) { try { connection = dataSource.getConnection(); boolean autoCommit = !inTx; @@ -60,7 +60,7 @@ JDBCPersistenceAdapter.log("Could not get JDBC connection: ", e); throw IOExceptionSupport.create(e); } - + try { connection.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED); } catch (Throwable e) { @@ -77,51 +77,54 @@ try { executeBatch(removedMessageStatement, "Failed to remove a message"); } finally { - removedMessageStatement=null; + removedMessageStatement = null; try { executeBatch(updateLastAckStatement, "Failed to ack a message"); } finally { - updateLastAckStatement=null; + updateLastAckStatement = null; } } } } private void executeBatch(PreparedStatement p, String message) throws SQLException { - if( p == null ) + if (p == null) return; - + try { int[] rc = p.executeBatch(); for (int i = 0; i < rc.length; i++) { int code = rc[i]; - if ( code < 0 && code != Statement.SUCCESS_NO_INFO ) { + if (code < 0 && code != Statement.SUCCESS_NO_INFO) { throw new SQLException(message + ". Response code: " + code); } } } finally { - try { p.close(); } catch (Throwable e) { } + try { + p.close(); + } catch (Throwable e) { + } } } - + public void close() throws IOException { - if( !inTx ) { + if (!inTx) { try { - + /** * we are not in a transaction so should not be committing ?? - * This was previously commented out - but had - * adverse affects on testing - so it's back! + * This was previously commented out - but had adverse affects + * on testing - so it's back! * */ - try{ + try { executeBatch(); } finally { if (connection != null && !connection.getAutoCommit()) { connection.commit(); } } - + } catch (SQLException e) { JDBCPersistenceAdapter.log("Error while closing connection: ", e); throw IOExceptionSupport.create(e); @@ -131,60 +134,60 @@ connection.close(); } } catch (Throwable e) { - log.warn("Close failed: "+e.getMessage(), e); + log.warn("Close failed: " + e.getMessage(), e); } finally { - connection=null; + connection = null; } } } } public void begin() throws IOException { - if( inTx ) + if (inTx) throw new IOException("Already started."); inTx = true; connection = getConnection(); } public void commit() throws IOException { - if( !inTx ) + if (!inTx) throw new IOException("Not started."); try { executeBatch(); - if( !connection.getAutoCommit() ) + if (!connection.getAutoCommit()) connection.commit(); } catch (SQLException e) { JDBCPersistenceAdapter.log("Commit failed: ", e); throw IOExceptionSupport.create(e); } finally { - inTx=false; + inTx = false; close(); } } - + public void rollback() throws IOException { - if( !inTx ) + if (!inTx) throw new IOException("Not started."); try { - if( addMessageStatement != null ) { + if (addMessageStatement != null) { addMessageStatement.close(); - addMessageStatement=null; + addMessageStatement = null; } - if( removedMessageStatement != null ) { + if (removedMessageStatement != null) { removedMessageStatement.close(); - removedMessageStatement=null; + removedMessageStatement = null; } - if( updateLastAckStatement != null ) { + if (updateLastAckStatement != null) { updateLastAckStatement.close(); - updateLastAckStatement=null; + updateLastAckStatement = null; } connection.rollback(); - + } catch (SQLException e) { JDBCPersistenceAdapter.log("Rollback failed: ", e); throw IOExceptionSupport.create(e); } finally { - inTx=false; + inTx = false; close(); } } @@ -192,6 +195,7 @@ public PreparedStatement getAddMessageStatement() { return addMessageStatement; } + public void setAddMessageStatement(PreparedStatement addMessageStatement) { this.addMessageStatement = addMessageStatement; } @@ -199,6 +203,7 @@ public PreparedStatement getUpdateLastAckStatement() { return updateLastAckStatement; } + public void setUpdateLastAckStatement(PreparedStatement ackMessageStatement) { this.updateLastAckStatement = ackMessageStatement; } @@ -206,6 +211,7 @@ public PreparedStatement getRemovedMessageStatement() { return removedMessageStatement; } + public void setRemovedMessageStatement(PreparedStatement removedMessageStatement) { this.removedMessageStatement = removedMessageStatement; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BlobJDBCAdapter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BlobJDBCAdapter.java?view=diff&rev=563982&r1=563981&r2=563982 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BlobJDBCAdapter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BlobJDBCAdapter.java Wed Aug 8 11:56:59 2007 @@ -30,19 +30,17 @@ import org.apache.activemq.store.jdbc.TransactionContext; import org.apache.activemq.util.ByteArrayOutputStream; - /** - * This JDBCAdapter inserts and extracts BLOB data using the - * getBlob()/setBlob() operations. This is a little more involved - * since to insert a blob you have to: + * This JDBCAdapter inserts and extracts BLOB data using the getBlob()/setBlob() + * operations. This is a little more involved since to insert a blob you have + * to: * - * 1: insert empty blob. - * 2: select the blob - * 3: finally update the blob with data value. + * 1: insert empty blob. 2: select the blob 3: finally update the blob with data + * value. * * The databases/JDBC drivers that use this adapter are: *
    - *
  • + *
  • *
* * @org.apache.xbean.XBean element="blobJDBCAdapter" @@ -50,23 +48,22 @@ * @version $Revision: 1.2 $ */ public class BlobJDBCAdapter extends DefaultJDBCAdapter { - - public void doAddMessage(Connection c, long seq, String messageID, String destinationName, byte[] data) throws SQLException, - JMSException { + + public void doAddMessage(Connection c, long seq, String messageID, String destinationName, byte[] data) + throws SQLException, JMSException { PreparedStatement s = null; ResultSet rs = null; try { - + // Add the Blob record. s = c.prepareStatement(statements.getAddMessageStatement()); s.setLong(1, seq); s.setString(2, destinationName); s.setString(3, messageID); s.setString(4, " "); - + if (s.executeUpdate() != 1) - throw new JMSException("Failed to broker message: " + messageID - + " in container."); + throw new JMSException("Failed to broker message: " + messageID + " in container."); s.close(); // Select the blob record so that we can update it. @@ -74,8 +71,7 @@ s.setLong(1, seq); rs = s.executeQuery(); if (!rs.next()) - throw new JMSException("Failed to broker message: " + messageID - + " in container."); + throw new JMSException("Failed to broker message: " + messageID + " in container."); // Update the blob Blob blob = rs.getBlob(1); @@ -90,8 +86,7 @@ s.setLong(2, seq); } catch (IOException e) { - throw (SQLException) new SQLException("BLOB could not be updated: " - + e).initCause(e); + throw (SQLException)new SQLException("BLOB could not be updated: " + e).initCause(e); } finally { try { rs.close(); @@ -103,37 +98,44 @@ } } } - + public byte[] doGetMessage(TransactionContext c, long seq) throws SQLException { - PreparedStatement s=null; ResultSet rs=null; - try { - - s = c.getConnection().prepareStatement(statements.getFindMessageStatement()); - s.setLong(1, seq); - rs = s.executeQuery(); - - if( !rs.next() ) - return null; - Blob blob = rs.getBlob(1); - InputStream is = blob.getBinaryStream(); - - ByteArrayOutputStream os = new ByteArrayOutputStream((int)blob.length()); - int ch; - while( (ch=is.read())>= 0 ) { - os.write(ch); - } - is.close(); - os.close(); - - return os.toByteArray(); - - } catch (IOException e) { - throw (SQLException) new SQLException("BLOB could not be updated: " - + e).initCause(e); + PreparedStatement s = null; + ResultSet rs = null; + try { + + s = c.getConnection().prepareStatement(statements.getFindMessageStatement()); + s.setLong(1, seq); + rs = s.executeQuery(); + + if (!rs.next()) { + return null; + } + Blob blob = rs.getBlob(1); + InputStream is = blob.getBinaryStream(); + + ByteArrayOutputStream os = new ByteArrayOutputStream((int)blob.length()); + int ch; + while ((ch = is.read()) >= 0) { + os.write(ch); + } + is.close(); + os.close(); + + return os.toByteArray(); + + } catch (IOException e) { + throw (SQLException)new SQLException("BLOB could not be updated: " + e).initCause(e); } finally { - try { rs.close(); } catch (Throwable e) {} - try { s.close(); } catch (Throwable e) {} - } + try { + rs.close(); + } catch (Throwable ignore) { + } + try { + s.close(); + } catch (Throwable ignore) { + } + } } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?view=diff&rev=563982&r1=563981&r2=563982 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Wed Aug 8 11:56:59 2007 @@ -34,10 +34,12 @@ import org.apache.commons.logging.LogFactory; /** - * Implements all the default JDBC operations that are used by the JDBCPersistenceAdapter.

sub-classing is - * encouraged to override the default implementation of methods to account for differences in JDBC Driver - * implementations.

The JDBCAdapter inserts and extracts BLOB data using the getBytes()/setBytes() operations.

- * The databases/JDBC drivers that use this adapter are: + * Implements all the default JDBC operations that are used by the + * JDBCPersistenceAdapter.

sub-classing is encouraged to override the + * default implementation of methods to account for differences in JDBC Driver + * implementations.

The JDBCAdapter inserts and extracts BLOB data using + * the getBytes()/setBytes() operations.

The databases/JDBC drivers that + * use this adapter are: *

    *
  • *
@@ -46,381 +48,390 @@ * * @version $Revision: 1.10 $ */ -public class DefaultJDBCAdapter implements JDBCAdapter{ +public class DefaultJDBCAdapter implements JDBCAdapter { - private static final Log log=LogFactory.getLog(DefaultJDBCAdapter.class); + private static final Log log = LogFactory.getLog(DefaultJDBCAdapter.class); protected Statements statements; - protected boolean batchStatments=true; + protected boolean batchStatments = true; - protected void setBinaryData(PreparedStatement s,int index,byte data[]) throws SQLException{ - s.setBytes(index,data); + protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException { + s.setBytes(index, data); } - protected byte[] getBinaryData(ResultSet rs,int index) throws SQLException{ + protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException { return rs.getBytes(index); } - public void doCreateTables(TransactionContext c) throws SQLException,IOException{ - Statement s=null; - try{ - // Check to see if the table already exists. If it does, then don't log warnings during startup. - // Need to run the scripts anyways since they may contain ALTER statements that upgrade a previous version + public void doCreateTables(TransactionContext c) throws SQLException, IOException { + Statement s = null; + try { + // Check to see if the table already exists. If it does, then don't + // log warnings during startup. + // Need to run the scripts anyways since they may contain ALTER + // statements that upgrade a previous version // of the table - boolean alreadyExists=false; - ResultSet rs=null; - try{ - rs=c.getConnection().getMetaData().getTables(null,null,statements.getFullMessageTableName(), - new String[] { "TABLE" }); - alreadyExists=rs.next(); - }catch(Throwable ignore){ - }finally{ + boolean alreadyExists = false; + ResultSet rs = null; + try { + rs = c.getConnection().getMetaData().getTables(null, null, + statements.getFullMessageTableName(), + new String[] {"TABLE"}); + alreadyExists = rs.next(); + } catch (Throwable ignore) { + } finally { close(rs); } - s=c.getConnection().createStatement(); - String[] createStatments=statements.getCreateSchemaStatements(); - for(int i=0;i D.LAST_ACKED_ID" +" ORDER BY M.ID"); - * s.setString(1,destinationName); s.setString(2,clientId); s.setString(3,subscriptionName); + * Useful for debugging. public void dumpTables(Connection c, String + * destinationName, String clientId, String subscriptionName) throws + * SQLException { printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out); + * printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out); + * PreparedStatement s = c.prepareStatement("SELECT M.ID, D.LAST_ACKED_ID + * FROM " +"ACTIVEMQ_MSGS M, " +"ACTIVEMQ_ACKS D " +"WHERE D.CONTAINER=? AND + * D.CLIENT_ID=? AND D.SUB_NAME=?" +" AND M.CONTAINER=D.CONTAINER AND M.ID > + * D.LAST_ACKED_ID" +" ORDER BY M.ID"); s.setString(1,destinationName); + * s.setString(2,clientId); s.setString(3,subscriptionName); * printQuery(s,System.out); } * - * public void dumpTables(Connection c) throws SQLException { printQuery(c, "Select * from ACTIVEMQ_MSGS", - * System.out); printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out); } + * public void dumpTables(Connection c) throws SQLException { printQuery(c, + * "Select * from ACTIVEMQ_MSGS", System.out); printQuery(c, "Select * from + * ACTIVEMQ_ACKS", System.out); } * - * private void printQuery(Connection c, String query, PrintStream out) throws SQLException { - * printQuery(c.prepareStatement(query), out); } + * private void printQuery(Connection c, String query, PrintStream out) + * throws SQLException { printQuery(c.prepareStatement(query), out); } * - * private void printQuery(PreparedStatement s, PrintStream out) throws SQLException { + * private void printQuery(PreparedStatement s, PrintStream out) throws + * SQLException { * - * ResultSet set=null; try { set = s.executeQuery(); ResultSetMetaData metaData = set.getMetaData(); for( int i=1; i<= - * metaData.getColumnCount(); i++ ) { if(i==1) out.print("||"); out.print(metaData.getColumnName(i)+"||"); } - * out.println(); while(set.next()) { for( int i=1; i<= metaData.getColumnCount(); i++ ) { if(i==1) out.print("|"); - * out.print(set.getString(i)+"|"); } out.println(); } } finally { try { set.close(); } catch (Throwable ignore) {} - * try { s.close(); } catch (Throwable ignore) {} } } + * ResultSet set=null; try { set = s.executeQuery(); ResultSetMetaData + * metaData = set.getMetaData(); for( int i=1; i<= + * metaData.getColumnCount(); i++ ) { if(i==1) out.print("||"); + * out.print(metaData.getColumnName(i)+"||"); } out.println(); + * while(set.next()) { for( int i=1; i<= metaData.getColumnCount(); i++ ) { + * if(i==1) out.print("|"); out.print(set.getString(i)+"|"); } + * out.println(); } } finally { try { set.close(); } catch (Throwable + * ignore) {} try { s.close(); } catch (Throwable ignore) {} } } */ - } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/StreamJDBCAdapter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/StreamJDBCAdapter.java?view=diff&rev=563982&r1=563981&r2=563982 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/StreamJDBCAdapter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/StreamJDBCAdapter.java Wed Aug 8 11:56:59 2007 @@ -26,12 +26,12 @@ import org.apache.activemq.util.ByteArrayInputStream; /** - * This JDBCAdapter inserts and extracts BLOB data using the + * This JDBCAdapter inserts and extracts BLOB data using the * setBinaryStream()/getBinaryStream() operations. * * The databases/JDBC drivers that use this adapter are: *
    - *
  • Axion
  • + *
  • Axion
  • *
* * @org.apache.xbean.XBean element="streamJDBCAdapter" @@ -39,12 +39,13 @@ * @version $Revision: 1.2 $ */ public class StreamJDBCAdapter extends DefaultJDBCAdapter { - + /** - * @see org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter#getBinaryData(java.sql.ResultSet, int) + * @see org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter#getBinaryData(java.sql.ResultSet, + * int) */ protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException { - + try { InputStream is = rs.getBinaryStream(index); ByteArrayOutputStream os = new ByteArrayOutputStream(1024 * 4); @@ -58,15 +59,16 @@ return os.toByteArray(); } catch (IOException e) { - throw (SQLException)new SQLException("Error reading binary parameter: "+index).initCause(e); + throw (SQLException)new SQLException("Error reading binary parameter: " + index).initCause(e); } } - + /** - * @see org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter#setBinaryData(java.sql.PreparedStatement, int, byte[]) + * @see org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter#setBinaryData(java.sql.PreparedStatement, + * int, byte[]) */ protected void setBinaryData(PreparedStatement s, int index, byte[] data) throws SQLException { s.setBinaryStream(index, new ByteArrayInputStream(data), data.length); } - + }