commons-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject cvs commit: jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger SessionFactory.java
Date Mon, 08 Jul 2002 16:19:06 GMT
jstrachan    2002/07/08 09:19:06

  Modified:    messenger/src/java/org/apache/commons/messagelet Main.java
                        BridgeMDO.java
               messenger/src/conf MessengerSpiritWave.xml
               messenger/src/java/org/apache/commons/messenger
                        SessionFactory.java
  Added:       messenger/src/java/org/apache/commons/messenger/tool
                        StopWatchMessageListener.java
  Log:
  Numerous patches applied. 
  
  Firstly the BridgeMDO will now work in a transacted mode or not based on the mode of the
Messenger connection.
  
  The transacted mode is set via the <factory> or <jndi> elements in the Messenger.xml.
e.g.
  
  <msessenger name="foo">
  	<factory className="com.acme.MySessionFactory" transacted="true">
  
  or
  
  <msessenger name="foo">
  	<jndi lookupName="foo" transacted="true">
  
  Also there is a simple StopWatchMessageListener that can be useful for doing simple timings
of Message processing.
  Also added a Messagelet Main.java which is a simple command line program that can be fired
off from Ant, Maven, Jelly that can consume a number of messages and delegate to MessageListener
and MDOs for processing of the messages, without requiring a Servlet engine.
  This is fine for quickly running some samples etc.
  
  Revision  Changes    Path
  1.2       +20 -2     jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/Main.java
  
  Index: Main.java
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/Main.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- Main.java	8 Jul 2002 14:37:36 -0000	1.1
  +++ Main.java	8 Jul 2002 16:19:05 -0000	1.2
  @@ -27,6 +27,7 @@
   import org.apache.commons.messagelet.impl.Subscription;
   import org.apache.commons.messagelet.impl.SubscriptionDigester;
   import org.apache.commons.messagelet.impl.SubscriptionList;
  +import org.apache.commons.messagelet.tool.StopWatchMessageListener;
   import org.apache.commons.messenger.Messenger;
   import org.apache.commons.messenger.MessengerManager;
   
  @@ -41,7 +42,7 @@
   public class Main {
   
       /** Logger */
  -    private static final Log log = LogFactory.getLog(DistributeBridgeMDO.class);
  +    private static final Log log = LogFactory.getLog(Main.class);
   
       /** The JMS connections */    
       private MessengerManager manager;
  @@ -54,6 +55,10 @@
       
       /** The URI where subscriptions are loaded from */
       private String subscriptionsConfig = "subscriptions.xml";
  +
  +    /** Should we use a stopwatch to output performance metrics */
  +    private boolean useStopWatch = true;
  +
       
       public static void main(String[] args) {
           Main main = new Main();
  @@ -113,7 +118,7 @@
       
       // Properties
       //-------------------------------------------------------------------------    
  -    
  +
       public String getConnectionsConfig() {
           return connectionsConfig;
       }
  @@ -173,6 +178,8 @@
               mdo.init( getServletContext() );
           }
           
  +        listener = wrapInStopWatch( listener );
  +        
           String subject = subscription.getSubject();
           if ( subject == null || subject.length() == 0 ) {
               throw new JMSException( "No destination defined for subscription: " + subscription
);
  @@ -239,4 +246,15 @@
       protected ServletContext getServletContext() {
           return null;
       }
  +    
  +    /**
  +     * Allows the MessageListener to be wrapped inside a stop watch message listener if
required 
  +     */
  +    protected MessageListener wrapInStopWatch( MessageListener listener ) {
  +        if ( useStopWatch ) {
  +            return new StopWatchMessageListener( listener );
  +        }
  +        return listener;
  +    }
  +        
   }
  
  
  
  1.5       +66 -19    jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/BridgeMDO.java
  
  Index: BridgeMDO.java
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/BridgeMDO.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- BridgeMDO.java	8 Jul 2002 15:15:24 -0000	1.4
  +++ BridgeMDO.java	8 Jul 2002 16:19:05 -0000	1.5
  @@ -65,7 +65,11 @@
       
       /** the buffer size used for ByteMessage and StreamMessage copying */
       private int bufferSize = 32 * 1024;
  -        
  +
  +    /** should this MDO work in transacted mode */
  +    private boolean transacted = false;
  +    
  +    
       public BridgeMDO() {
       }
       
  @@ -80,15 +84,27 @@
               if ( outputMessenger == null ) {
                   throw new ServletException( "No output Messenger is defined for this Bridge"
);
               }
  -            validateOutputDestination();
               
               // enable transacted mode 
  -            messenger.getSessionFactory().setTransacted( true );     
  -            outputMessenger.getSessionFactory().setTransacted( true );     
  +            boolean tran1 = messenger.getSessionFactory().isTransacted();
  +            boolean tran2 = outputMessenger.getSessionFactory().isTransacted();
  +            
  +            if ( tran1 != tran2 ) {
  +                throw new ServletException( 
  +                    "Both the input and output Messenger must have the same transacted
mode. "
  +                    + "Input is: " + tran1 + " output is: " + tran2 
  +                );
  +            }
  +            transacted = tran1;
               
               // use client acknowledgement
  -            messenger.getSessionFactory().setAcknowledgeMode( Session.CLIENT_ACKNOWLEDGE
);     
  -            outputMessenger.getSessionFactory().setAcknowledgeMode( Session.CLIENT_ACKNOWLEDGE
);     
  +            
  +            // ### This should be specified in the Messenger.xml file
  +            //messenger.getSessionFactory().setAcknowledgeMode( Session.CLIENT_ACKNOWLEDGE
);     
  +            //outputMessenger.getSessionFactory().setAcknowledgeMode( Session.CLIENT_ACKNOWLEDGE
);     
  +
  +            validateOutputDestination();
  +            
           }
           catch (JMSException e) {
               log.error( "Caught exception trying to configure the transacted, client acknowledge
modes of the JMS connections" );
  @@ -126,6 +142,22 @@
       
       // Properties
       //-------------------------------------------------------------------------
  +    
  +    /** 
  +     * @return true if this MDO should work in transacted mode
  +     */
  +    public boolean isTransacted() {
  +        return transacted;
  +    }
  +
  +    /**
  +     * Sets whether this MDO should work in transacted mode
  +     */    
  +    public void setTransacted(boolean transacted) {
  +        this.transacted = transacted;
  +    }
  +    
  +        
       public String getOutputConnection() {
           return outputConnection;
       }
  @@ -217,8 +249,15 @@
        * output Messenger.
        */
       protected void commit() throws JMSException {
  -        getOutputMessenger().commit();
  -        getMessenger().commit();
  +        if ( transacted ) {
  +            Messenger outputMessenger = getOutputMessenger();
  +            Messenger inputMessenger = getMessenger();
  +            
  +            if ( outputMessenger != inputMessenger ) {
  +                outputMessenger.commit();
  +            }
  +            inputMessenger.commit();
  +        }
       }
   
       /**
  @@ -226,17 +265,25 @@
        * output Messenger.
        */
       protected void rollback() {
  -        try {
  -            getOutputMessenger().rollback();
  -        }
  -        catch (Exception e) {
  -            log.error( "Caught exception rolling back the output messenger: " + e, e );
  -        }
  -        try {
  -            getMessenger().rollback();
  -        }
  -        catch (Exception e) {
  -            log.error( "Caught exception rolling back the input messenger: " + e, e );
  +        if ( transacted ) {
  +            try {
  +                Messenger outputMessenger = getOutputMessenger();
  +                Messenger inputMessenger = getMessenger();
  +                
  +                if ( outputMessenger != inputMessenger ) {
  +                        outputMessenger.rollback();
  +                }
  +            }
  +            catch (Exception e) {
  +                log.error( "Caught exception rolling back the output messenger: " + e,
e );
  +            }
  +            
  +            try {
  +                getMessenger().rollback();
  +            }
  +            catch (Exception e) {
  +                log.error( "Caught exception rolling back the input messenger: " + e, e
);
  +            }
           }
       }
   
  
  
  
  1.4       +2 -2      jakarta-commons-sandbox/messenger/src/conf/MessengerSpiritWave.xml
  
  Index: MessengerSpiritWave.xml
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/conf/MessengerSpiritWave.xml,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- MessengerSpiritWave.xml	17 May 2002 15:05:46 -0000	1.3
  +++ MessengerSpiritWave.xml	8 Jul 2002 16:19:05 -0000	1.4
  @@ -2,11 +2,11 @@
   <manager>
   
     <messenger name="topic">
  -    <factory className="com.spirit.messenger.WaveTopicSessionFactory">
  +    <factory className="com.spirit.messenger.WaveTopicSessionFactory" transacted="true">
       </factory>
     </messenger>
     <messenger name="queue">
  -    <factory className="com.spirit.messenger.WaveQueueSessionFactory">
  +    <factory className="com.spirit.messenger.WaveQueueSessionFactory" transacted="true">
       </factory>
     </messenger>
   </manager>
  
  
  
  1.1                  jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/tool/StopWatchMessageListener.java
  
  Index: StopWatchMessageListener.java
  ===================================================================
  /*
   * Copyright (C) The Apache Software Foundation. All rights reserved.
   *
   * This software is published under the terms of the Apache Software License
   * version 1.1, a copy of which has been included with this distribution in
   * the LICENSE file.
   * 
   * $Id: MessengerMDO.java,v 1.4 2002/05/17 15:05:46 jstrachan Exp $
   */
  
  package org.apache.commons.messagelet.tool;
  
  import javax.jms.Message;
  import javax.jms.MessageListener;
  
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
  
  /**
   * A simple StopWatch Message Listener for wrapping another MessageListener
   * to determine its performance.
   *
   * @author  James Strachan
   */
  public class StopWatchMessageListener implements MessageListener {
  
      /** The Log to which logging calls will be made. */
      private Log log = LogFactory.getLog( StopWatchMessageListener.class );
      
      /** the underlying MessageListener */
      private MessageListener listener;
  
      /** the number of messages processed */
      private int count;
          
      /** the message group size */
      private int groupSize = 1000;
          
      /** the time that the batch started processing */
      private long startTime;
              
      public StopWatchMessageListener(MessageListener listener) {
          this.listener = listener;
      }
      
      // MessageListener interface
      //-------------------------------------------------------------------------        
           
      public void onMessage(Message message) {
          if ( count == 0 ) {
              startTime = System.currentTimeMillis();
          }
          listener.onMessage(message);
          
          if ( ++count == groupSize ) {
              long elapsed = System.currentTimeMillis() - startTime;            
              double timePerMessage = elapsed / count;
              double messagesPerSecond = groupSize / timePerMessage;
              
              
              log.info( "Time to process " + count + " messages: " + elapsed + " millis" );
              log.info( "Average time per message: " + timePerMessage + " millis" );
              log.info( "Average number of messages per second: " + messagesPerSecond );
              count = 0;
          }
      }
  
      
      // Properties
      //-------------------------------------------------------------------------        
       
      
      /**
       * @return the number of messages in the group before the performance statistics are
logged
       */
      public int getGroupSize() {
          return groupSize;
      }    
          
      /**
       * Sets the number of messages in the group before the performance statistics are logged
       */
      public void setGroupSize(int groupSize) {
          this.groupSize = groupSize;
      }    
      
      
      /**
       * @return the logger to which statistic messages will be sent
       */
      public Log getLog() {
          return log;
      }
      
      /**
       * Sets the logger to which statistic messages will be sent
       */
      public void setLog(Log log) {
          this.log = log;
      }
          
      
  }
  
  
  
  1.13      +9 -2      jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/SessionFactory.java
  
  Index: SessionFactory.java
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/SessionFactory.java,v
  retrieving revision 1.12
  retrieving revision 1.13
  diff -u -r1.12 -r1.13
  --- SessionFactory.java	17 May 2002 15:05:45 -0000	1.12
  +++ SessionFactory.java	8 Jul 2002 16:19:06 -0000	1.13
  @@ -64,6 +64,13 @@
   
       /** Creates a new Session instance */
       public Session createSession(Connection connection) throws JMSException {
  +        if ( log.isDebugEnabled() ) {
  +            log.debug( 
  +                "Creating a JMS session in transacted mode: " 
  +                + isTransacted() + " with ack mode: " + getAcknowledgeMode() 
  +            );
  +        }
  +        
           if (topic) {
               TopicConnection topicConnection = (TopicConnection) connection;
               return topicConnection.createTopicSession(isTransacted(), getAcknowledgeMode());
  
  
  

--
To unsubscribe, e-mail:   <mailto:commons-dev-unsubscribe@jakarta.apache.org>
For additional commands, e-mail: <mailto:commons-dev-help@jakarta.apache.org>


Mime
View raw message