activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r424328 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/ store/jdbc/
Date Fri, 21 Jul 2006 14:10:45 GMT
Author: jstrachan
Date: Fri Jul 21 07:10:45 2006
New Revision: 424328

URL: http://svn.apache.org/viewvc?rev=424328&view=rev
Log:
an implementation of an exclusive lock in SQL to ensure that only one JDBC message store is
run against a database at once to fix AMQ-831. For documentation on this feature see: http://activemq.org/site/jdbc-master-slave.html


Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerServiceAware.java
  (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DatabaseLocker.java
  (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java
  (with props)
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=424328&r1=424327&r2=424328&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Fri Jul 21 07:10:45 2006
@@ -558,6 +558,7 @@
     public PersistenceAdapter getPersistenceAdapter() throws IOException {
         if (persistenceAdapter == null) {
             persistenceAdapter = createPersistenceAdapter();
+            configureService(persistenceAdapter);
         }
         return persistenceAdapter;
     }
@@ -771,6 +772,54 @@
         getPersistenceAdapter().deleteAllMessages();
     }
 
+    public boolean isDeleteAllMessagesOnStartup() {
+        return deleteAllMessagesOnStartup;
+    }
+
+    /**
+     * Sets whether or not all messages are deleted on startup - mostly only
+     * useful for testing.
+     */
+    public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStartup)
{
+        this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup;
+    }
+
+    public URI getVmConnectorURI() {
+        if (vmConnectorURI == null) {
+            try {
+                vmConnectorURI = new URI("vm://" + getBrokerName());
+            }
+            catch (URISyntaxException e) {
+            }
+        }
+        return vmConnectorURI;
+    }
+
+    public void setVmConnectorURI(URI vmConnectorURI) {
+        this.vmConnectorURI = vmConnectorURI;
+    }
+
+    /**
+     * @return Returns the shutdownOnMasterFailure.
+     */
+    public boolean isShutdownOnMasterFailure(){
+        return shutdownOnMasterFailure;
+    }
+
+    /**
+     * @param shutdownOnMasterFailure The shutdownOnMasterFailure to set.
+     */
+    public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure){
+        this.shutdownOnMasterFailure=shutdownOnMasterFailure;
+    }
+
+    public boolean isKeepDurableSubsActive() {
+        return keepDurableSubsActive;
+    }
+
+    public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
+        this.keepDurableSubsActive = keepDurableSubsActive;
+    }
     // Implementation methods
     // -------------------------------------------------------------------------
     /**
@@ -1132,52 +1181,13 @@
         connector.start();
     }
 
-    public boolean isDeleteAllMessagesOnStartup() {
-        return deleteAllMessagesOnStartup;
-    }
-
     /**
-     * Sets whether or not all messages are deleted on startup - mostly only
-     * useful for testing.
+     * Perform any custom dependency injection
      */
-    public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStartup)
{
-        this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup;
-    }
-
-    public URI getVmConnectorURI() {
-        if (vmConnectorURI == null) {
-            try {
-                vmConnectorURI = new URI("vm://" + getBrokerName());
-            }
-            catch (URISyntaxException e) {
-            }
+    protected void configureService(Object service) {
+        if (service instanceof BrokerServiceAware) {
+            BrokerServiceAware serviceAware = (BrokerServiceAware) service;
+            serviceAware.setBrokerService(this);
         }
-        return vmConnectorURI;
-    }
-
-    public void setVmConnectorURI(URI vmConnectorURI) {
-        this.vmConnectorURI = vmConnectorURI;
-    }
-
-    /**
-     * @return Returns the shutdownOnMasterFailure.
-     */
-    public boolean isShutdownOnMasterFailure(){
-        return shutdownOnMasterFailure;
-    }
-
-    /**
-     * @param shutdownOnMasterFailure The shutdownOnMasterFailure to set.
-     */
-    public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure){
-        this.shutdownOnMasterFailure=shutdownOnMasterFailure;
-    }
-
-    public boolean isKeepDurableSubsActive() {
-        return keepDurableSubsActive;
-    }
-
-    public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
-        this.keepDurableSubsActive = keepDurableSubsActive;
     }
 }

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerServiceAware.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerServiceAware.java?rev=424328&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerServiceAware.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerServiceAware.java
Fri Jul 21 07:10:45 2006
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+/**
+ * An interface used to represent a component that wants the {@link BrokerService} 
+ * to be injected
+ * 
+ * @version $Revision: $
+ */
+public interface BrokerServiceAware {
+
+    public void setBrokerService(BrokerService brokerService);
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerServiceAware.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DatabaseLocker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DatabaseLocker.java?rev=424328&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DatabaseLocker.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DatabaseLocker.java
Fri Jul 21 07:10:45 2006
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.jdbc;
+
+import org.apache.activemq.Service;
+
+/**
+ * Represents some kind of lock service to ensure that a broker is the only master
+ * 
+ * @version $Revision: $
+ */
+public interface DatabaseLocker extends Service {
+
+    /**
+     * Used by a timer to keep alive the lock.
+     * If the method returns false the broker should be terminated
+     */
+    public boolean keepAlive();
+    
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DatabaseLocker.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java?rev=424328&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java
Fri Jul 21 07:10:45 2006
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.jdbc;
+
+import org.apache.activemq.Service;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.sql.DataSource;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+/**
+ * Represents an exclusive lock on a database to avoid multiple brokers
+ * running against the same logical database.
+ * 
+ * @version $Revision: $
+ */
+public class DefaultDatabaseLocker implements DatabaseLocker {
+    private static final Log log = LogFactory.getLog(DefaultDatabaseLocker.class);
+    
+    private final DataSource dataSource;
+    private final Statements statements;
+    private long sleepTime = 1000;
+    private Connection connection;
+
+    public DefaultDatabaseLocker(DataSource dataSource, Statements statements) {
+        this.dataSource = dataSource;
+        this.statements = statements;
+    }
+
+    public void start() throws Exception {
+        log.debug("Attempting to acquire exclusive lock on the database");
+        
+        connection = dataSource.getConnection();
+        connection.setAutoCommit(false);
+        
+        PreparedStatement statement = connection.prepareStatement(statements.getLockCreateStatement());
+        while (true) {
+            try {
+                boolean answer = statement.execute();
+                if (answer) {
+                    break;
+                }
+            }
+            catch (Exception e) {
+                log.error("Failed to acquire lock: " + e, e);
+            }
+            log.info("Sleeping for " + sleepTime + " milli(s) before trying again to get
the lock...");
+            Thread.sleep(sleepTime);
+        }
+        
+        log.info("Becoming the master on dataSource: " + dataSource);
+    }
+
+    public void stop() throws Exception {
+        if (connection != null) {
+            connection.close();
+        }
+    }
+
+    public boolean keepAlive() {
+        try {
+            PreparedStatement statement = connection.prepareStatement(statements.getLockUpdateStatement());
+            statement.setLong(1, System.currentTimeMillis());
+            int rows = statement.executeUpdate();
+            if (rows == 1) {
+                return true;
+            }
+        }
+        catch (Exception e) {
+            log.error("Failed to update database lock: " + e, e);
+        }
+        return false;
+    }
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=424328&r1=424327&r2=424328&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
Fri Jul 21 07:10:45 2006
@@ -16,15 +16,15 @@
  */
 package org.apache.activemq.store.jdbc;
 
-import java.io.IOException;
-import java.sql.SQLException;
-import java.util.Collections;
-import java.util.Set;
-
-import javax.sql.DataSource;
+import edu.emory.mathcs.backport.java.util.concurrent.ScheduledFuture;
+import edu.emory.mathcs.backport.java.util.concurrent.ScheduledThreadPoolExecutor;
+import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
 
 import org.apache.activeio.command.WireFormat;
 import org.apache.activeio.util.FactoryFinder;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -40,10 +40,12 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import edu.emory.mathcs.backport.java.util.concurrent.ScheduledFuture;
-import edu.emory.mathcs.backport.java.util.concurrent.ScheduledThreadPoolExecutor;
-import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory;
-import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+import javax.sql.DataSource;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.Set;
 
 /**
  * A {@link PersistenceAdapter} implementation using JDBC for persistence
@@ -57,19 +59,23 @@
  * 
  * @version $Revision: 1.9 $
  */
-public class JDBCPersistenceAdapter extends DataSourceSupport implements PersistenceAdapter
{
+public class JDBCPersistenceAdapter extends DataSourceSupport implements PersistenceAdapter,
BrokerServiceAware {
 
     private static final Log log = LogFactory.getLog(JDBCPersistenceAdapter.class);
     private static FactoryFinder factoryFinder = new FactoryFinder("META-INF/services/org/apache/activemq/store/jdbc/");
 
     private WireFormat wireFormat = new OpenWireFormat();
+    private BrokerService brokerService;
     private Statements statements;
     private JDBCAdapter adapter;
     private MemoryTransactionStore transactionStore;
     private ScheduledThreadPoolExecutor clockDaemon;
     private ScheduledFuture clockTicket;
-    int cleanupPeriod = 1000 * 60 * 5;
+    private int cleanupPeriod = 1000 * 60 * 5;
     private boolean useExternalMessageReferences;
+    private boolean useDatabaseLock = true;
+    private int lockKeepAlivePeriod = 0;
+    private DatabaseLocker databaseLocker;
 
     public JDBCPersistenceAdapter() {
     }
@@ -156,6 +162,16 @@
         } finally {
             transactionContext.commit();
         }
+        
+        if (isUseDatabaseLock()) {
+            DatabaseLocker service = getDatabaseLocker();
+            if (service == null) {
+                log.warn("No databaseLocker configured for the JDBC Persistence Adapter");
+            }
+            else {
+                service.start();
+            }
+        }
 
         cleanup();
 
@@ -175,6 +191,10 @@
             clockTicket = null;
             clockDaemon.shutdown();
         }
+        DatabaseLocker service = getDatabaseLocker();
+        if (service != null) {
+            service.stop();
+        }
     }
 
     public void cleanup() {
@@ -227,6 +247,36 @@
         return adapter;
     }
 
+    
+    public DatabaseLocker getDatabaseLocker() throws IOException {
+        if (databaseLocker == null) {
+            databaseLocker = createDatabaseLocker();
+            if (lockKeepAlivePeriod > 0) {
+                getScheduledThreadPoolExecutor().scheduleAtFixedRate(new Runnable() {
+                    public void run() {
+                        databaseLockKeepAlive();
+                    }
+                }, lockKeepAlivePeriod, lockKeepAlivePeriod, TimeUnit.MILLISECONDS);
+            }
+        }
+        return databaseLocker;
+    }
+
+    /**
+     * Sets the database locker strategy to use to lock the database on startup
+     */
+    public void setDatabaseLocker(DatabaseLocker databaseLocker) {
+        this.databaseLocker = databaseLocker;
+    }
+    
+    public BrokerService getBrokerService() {
+        return brokerService;
+    }
+
+    public void setBrokerService(BrokerService brokerService) {
+        this.brokerService = brokerService;
+    }
+
     /**
      * @throws IOException
      */
@@ -342,6 +392,15 @@
         this.useExternalMessageReferences = useExternalMessageReferences;
     }
     
+    
+    public boolean isUseDatabaseLock() {
+        return useDatabaseLock;
+    }
+
+    public void setUseDatabaseLock(boolean useDatabaseLock) {
+        this.useDatabaseLock = useDatabaseLock;
+    }
+
     static public void log(String msg, SQLException e) {
         String s = msg+e.getMessage();
         while( e.getNextException() != null ) {
@@ -368,4 +427,37 @@
     public void setUsageManager(UsageManager usageManager) {
     }
 
+
+    protected void databaseLockKeepAlive() {
+        boolean stop = false;
+        try {
+            DatabaseLocker locker = getDatabaseLocker();
+            if (locker != null) {
+                if (!locker.keepAlive()) {
+                    stop = true;
+                }
+            }
+        }
+        catch (IOException e) {
+            log.error("Failed to get database when trying keepalive: " + e, e);
+        }
+        if (stop) {
+            stopBroker();
+        }
+    }
+
+    protected void stopBroker() {
+        // we can no longer keep the lock so lets fail
+        log.info("No longer able to keep the exclusive lock so giving up being a master");
+        try {
+            brokerService.stop();
+        }
+        catch (Exception e) {
+            log.warn("Failed to stop broker");
+        }
+    }
+
+    protected DatabaseLocker createDatabaseLocker() throws IOException {
+        return new DefaultDatabaseLocker(getDataSource(), getStatements());
+    }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?rev=424328&r1=424327&r2=424328&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
Fri Jul 21 07:10:45 2006
@@ -27,6 +27,7 @@
     private String tablePrefix = "";
     protected String messageTableName = "ACTIVEMQ_MSGS";
     protected String durableSubAcksTableName = "ACTIVEMQ_ACKS";
+    protected String lockTableName = "ACTIVEMQ_LOCK";
 
     protected String binaryDataType = "BLOB";
     protected String containerNameDataType = "VARCHAR(250)";
@@ -57,6 +58,9 @@
     private String deleteOldMessagesStatement;
     private String[] createSchemaStatements;
     private String[] dropSchemaStatements;
+    private String lockCreateStatement;
+    private String lockUpdateStatement;
+    private boolean useLockCreateWhereClause;
 
     public String[] getCreateSchemaStatements() {
         if (createSchemaStatements == null) {
@@ -75,7 +79,11 @@
                     "CREATE TABLE " + getFullAckTableName() + "(" + "CONTAINER " + containerNameDataType
+ " NOT NULL"
                             + ", CLIENT_ID " + stringIdDataType + " NOT NULL" + ", SUB_NAME
" + stringIdDataType
                             + " NOT NULL" + ", SELECTOR " + stringIdDataType + ", LAST_ACKED_ID
" + sequenceDataType
-                            + ", PRIMARY KEY ( CONTAINER, CLIENT_ID, SUB_NAME))", };
+                            + ", PRIMARY KEY ( CONTAINER, CLIENT_ID, SUB_NAME))", 
+                    "CREATE TABLE " + getFullLockTableName() + "( ID " + longDataType + ",
TIME " + longDataType 
+                            + ", BROKER_NAME " + stringIdDataType + ", PRIMARY KEY (ID) )",
+                    "INSERT INTO " + getFullLockTableName() + "(ID) VALUES (1)", 
+            };
         }
         return createSchemaStatements;
     }
@@ -220,12 +228,30 @@
     public String getDeleteOldMessagesStatement() {
         if (deleteOldMessagesStatement == null) {
             deleteOldMessagesStatement = "DELETE FROM " + getFullMessageTableName()
-                    + " WHERE ( EXPIRATION<>0 AND EXPIRATION<?) OR ID <= " +
"( SELECT min(" + getFullAckTableName()
-                    + ".LAST_ACKED_ID) " + "FROM " + getFullAckTableName() + " WHERE " +
getFullAckTableName()
-                    + ".CONTAINER=" + getFullMessageTableName() + ".CONTAINER)";
+            + " WHERE ( EXPIRATION<>0 AND EXPIRATION<?) OR ID <= " + "( SELECT
min(" + getFullAckTableName()
+            + ".LAST_ACKED_ID) " + "FROM " + getFullAckTableName() + " WHERE " + getFullAckTableName()
+            + ".CONTAINER=" + getFullMessageTableName() + ".CONTAINER)";
         }
         return deleteOldMessagesStatement;
     }
+    
+    public String getLockCreateStatement() {
+        if (lockCreateStatement == null) {
+            lockCreateStatement = "SELECT * FROM " + getFullLockTableName();
+            if (useLockCreateWhereClause) {
+                lockCreateStatement += " WHERE ID = 1";
+            }
+            lockCreateStatement += " FOR UPDATE";
+        }
+        return lockCreateStatement;
+    }
+    
+    public String getLockUpdateStatement() {
+        if (lockUpdateStatement == null) {
+            lockUpdateStatement = "UPDATE " + getFullLockTableName() + " SET time = ? WHERE
ID = 1";
+        }
+        return lockUpdateStatement;
+    }
 
     public String getFullMessageTableName() {
         return getTablePrefix() + getMessageTableName();
@@ -234,7 +260,12 @@
     public String getFullAckTableName() {
         return getTablePrefix() + getDurableSubAcksTableName();
     }
+    
+    public String getFullLockTableName() {
+        return getTablePrefix() + getLockTableName();
+    }
 
+    
     /**
      * @return Returns the containerNameDataType.
      */
@@ -339,6 +370,14 @@
     public void setDurableSubAcksTableName(String durableSubAcksTableName) {
         this.durableSubAcksTableName = durableSubAcksTableName;
     }
+    
+    public String getLockTableName() {
+        return lockTableName;
+    }
+
+    public void setLockTableName(String lockTableName) {
+        this.lockTableName = lockTableName;
+    }
 
     public String getLongDataType() {
         return longDataType;
@@ -444,4 +483,19 @@
         this.updateMessageStatement = updateMessageStatment;
     }
 
+    public boolean isUseLockCreateWhereClause() {
+        return useLockCreateWhereClause;
+    }
+
+    public void setUseLockCreateWhereClause(boolean useLockCreateWhereClause) {
+        this.useLockCreateWhereClause = useLockCreateWhereClause;
+    }
+
+    public void setLockCreateStatement(String lockCreateStatement) {
+        this.lockCreateStatement = lockCreateStatement;
+    }
+
+    public void setLockUpdateStatement(String lockUpdateStatement) {
+        this.lockUpdateStatement = lockUpdateStatement;
+    }
 }



Mime
View raw message