activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From karl de Boer <karl.de.b...@jnc.nl>
Subject Re: Persistence Adapter Postgres JDBC issue
Date Wed, 13 Feb 2008 18:10:12 GMT


Actually the database error was caused by the producer code. Can anybody
point me out what is wrong with this one?
I don't use failover transport and i am closing both the session and
producer.
If i dont use createSession(true, Session.TRANSACTED) I run oout of db
connections in 40 messages which is also the default for PostgresSQL. The
maximum set in the datasource is appearantly not used.

Can anybody point out a mistake in the next code (a thread with a loop ? Is
it me or ActiveMQ?


    // ========== Static variables ================

    protected static long SLEEP_INTERVAL = 1000; //  1 seconds
    protected static boolean stopProcessing = false;

    // ============= Private variables ====================
	protected static Log log =
LogFactory.getLog(WitsTestMessageGeneratorThread.class);
	protected static Log sysadminLog =
LogFactory.getLog(Constants.SYSADMIN_LOGGER);
	/**
     * this boolean controls whether an alert has already been sent. It
prevents this class bombarding
     * administrators with email alerts
     */
    protected boolean errorAlerted = false;
    protected int errorCount = 0;
    protected javax.jms.Connection connection;
    protected javax.jms.Destination destination;
    protected Session session;
    protected MessageProducer producer;
    protected boolean queueInitialized;
    private String user = PrimeConnection.DEFAULT_USER;
    private String password = PrimeConnection.DEFAULT_PASSWORD;
    private String url = PrimeConnection.DEFAULT_BROKER_URL;
    private String destinationName = PrimeConnection.DESTINATION_NAME;
    private boolean transacted = true;
    private boolean persistent = true;
    private long timeToLive;
	
    WitsTestMessageGeneratorThread() {
        super();
    }

   
    public synchronized void stopProcessing() {
        stopProcessing = true;
   }

    // ============= Public methods

    public void run() {

        stopProcessing = false;
        errorAlerted = false;
    	long counter = 0L;
    	try {
	        while (!stopProcessing) {
	            if (log.isDebugEnabled()) {
	            	log.debug("WitsTestMessageGeneratorThread");
	            }
	            try {
	            	if (!checkEntryConditions()) {
		                if (log.isDebugEnabled()) {
		                	log.debug("WitsTestMessageGeneratorThread entry
conditions not met, trying to create a connection");
		                }
		            	try {
							initQueue();
						} catch (JMSException e) {
							log.error("Error initializing queue: " + e.getMessage(),e);				
							throw e;
						}
						
		                if (log.isDebugEnabled()) {
		                	log.debug("Connection created");
		                }
						
	            	}
	            	if (checkEntryConditions()) {
		        	    try {
		                    // Create the session
		                    session = connection.createSession(transacted,
Session.SESSION_TRANSACTED);
		                    destination = session.createQueue(destinationName);
		                    // Create the producer.
		                    producer = session.createProducer(destination);
			                if (log.isDebugEnabled()) {
			                	log.debug("Session and Producer created");
			                }
		                    if (persistent) {
		                        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
		                    } else {
		                       
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
		                    }
		                    if (timeToLive != 0) {
		                        producer.setTimeToLive(timeToLive);
		                    }
		                    counter = counter + 1;
		            	    String msgContent = "Test Message " + counter + ". Date:
" + new Date();
							TextMessage msg = session.createTextMessage();
							msg.setText(msgContent);
							producer.send(msg);
			                log.debug("Message " + msgContent + " sent to " +
destinationName);
							if (transacted) {
								session.commit();
							}
						} 
		        	    catch (JMSException e) {
							log.error("Error sending message to " + destinationName + ". Error: "
+ e.getMessage());
							throw e;
						}
		        	    finally {
							if (session != null) session.close();
							session = null;
							if (producer != null) producer.close();
							producer = null;
		        	    }
						if (errorAlerted) {
			           		sysadminLog.error("WitsTestMessageGenerator resumed sending
messages to the queue");
						}
		                errorAlerted = false;
	            	}
	            }
	            catch (Exception e) {
	        		log.error("WitsTestMessageGenerator cannot send a jms message to
the queue. Message: " + e.getMessage(),e);
	        		if (connection != null) {
	        			try {
							connection.close();
						} 
	        			catch (JMSException e1) {}
						connection = null;
						destination = null;
	        		}
	        		if (!errorAlerted && errorCount >= 2) {
	            		sysadminLog.error("WitsTestMessageGenerator cannot send a jms
message to the queue. Message: " + e.getMessage());
	            		errorAlerted = true;
	            	}
	            }
	            
	            
	            long sleepInterval = getSleepInterval();
	            try {
	                sleep(sleepInterval);
	            }
	            catch (InterruptedException e) {
	            }
	
	        }
    	}
    	finally {
	    	if (connection != null) {
				try {
					connection.close();
				} 
				catch (JMSException e1) {}
				connection = null;
			}
    	}
    	log.info("Processing of WitsTestMessageGenerator thread stopped");
        try {
        	if (session != null) session.close();
			if (connection != null) {
				connection.close();
			}
			if (producer != null) producer.close();
		} catch (JMSException e) {
			log.error("Error closing JMS connection: " + e.getMessage());
		}
    }


    protected synchronized static long getSleepInterval() {
        return SLEEP_INTERVAL;
    }
    
    
    public void initQueue() throws JMSException {
    	
        connection = null;
        System.out.println("Connecting to URL: " + url);
        System.out.println("Publishing to queue: " + destinationName);
        System.out.println("Using " + (persistent ? "persistent" :
"non-persistent") + " messages");
        System.out.println("Sleeping between publish " + SLEEP_INTERVAL + "
ms");
        if (timeToLive != 0) {
            System.out.println("Messages time to live " + timeToLive + "
ms");
        }
        // Create the connection.
        ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(user, password, url);
        connection = connectionFactory.createConnection();
        connectionFactory = null;
	    errorCount = 0;
     }
    
     
     protected boolean checkEntryConditions() {
    	
    	if(this.connection == null) {
     		
    		errorCount++;
    	    return false;
    	} 
    	else {
     		errorCount = 0;
    	    return true;
    	}
    }
     

-- 
View this message in context: http://www.nabble.com/Persistence-Adapter-Postgres-JDBC-issue-tp15459381s2354p15462064.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Mime
View raw message