activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jayasreeb <jayasre...@infosys.com>
Subject Problem in sending SOPA with Attachment using Active MQ
Date Thu, 16 Oct 2008 14:33:12 GMT

Hi,

My requirement is to place SOAP Message with Attachment(binary data- Java
object) in JMS. I am using fuse-message-broker-5.0.0.17.

I have a producer program which creates a soap message with
attachment(single attachment) using data handler and places it using
SOAPMessageIntoJMSMessage.When i check the count of Attachment in Producer
class its giving 1 before placing the SOAP Message into JMS Message.

I have COnsumer class which picks up the same JMS message and displays the
content of soap message. In this class when i check the attachment count its
giving 0.

Do i need to do any settings in Active MQ/ consumer.java to accept the
attachment?

I have placed following jars in my workspace.
1.activation.jar
2.axis.jar(1.4)
3.imq.jar
4.imqxm.jar
5.saaj.jar
6.mail.jar
7.wsdl4j-1.5.1.jar
8.jaxm-api.jar
9.jaxrpc.jar
10.commons-discovery-0.2.jar
11.commons-logging-1.0.4.jar


Please find both producer and consumer class details. Please suggest me
solution for this problem.


Producer.java
import java.awt.datatransfer.DataFlavor;
import java.io.File;
import java.net.URL;

import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.soap.MessageFactory;
import javax.activation.DataHandler;
import javax.activation.DataSource;
import javax.activation.FileDataSource;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.mail.util.ByteArrayDataSource;
import javax.xml.soap.AttachmentPart;
import javax.xml.soap.SOAPBody;
import javax.xml.soap.SOAPBodyElement;
import javax.xml.soap.SOAPEnvelope;
import javax.xml.soap.SOAPMessage;
import javax.xml.soap.SOAPPart;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.util.IndentPrinter;
import org.w3c.dom.Document;
import com.sun.messaging.xml.MessageTransformer;

public class Producer {
	private Destination destination;
    private int messageCount = 10;
    private long sleepTime;
    private boolean verbose = true;
    private int messageSize = 255;
    private long timeToLive;
    private String user = ActiveMQConnection.DEFAULT_USER;
    private String password = ActiveMQConnection.DEFAULT_PASSWORD;
    private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
    private String subject = "TOOL1.DEFAULT";
    private boolean topic;
    private boolean transacted;
    private boolean persistent;
    public static void main(String[] args) {
        Producer producerTool = new Producer();
        producerTool.run();
    }
    public void run() {
        Connection connection = null;
        try {
            System.out.println("Connecting to URL: " + url);
            System.out.println("Publishing a Message with size " +
messageSize + " to " + (topic ? "topic" : "queue") + ": " + subject);
            System.out.println("Using " + (persistent ? "persistent" :
"non-persistent") + " messages");
            System.out.println("Sleeping between publish " + sleepTime + "
ms");
            if (timeToLive != 0) {
                System.out.println("Messages time to live " + timeToLive + "
ms");
            }

            // Create the connection.
         ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(user, password, url);
          //  ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("tcp://blrkec38454d.ad.infosys.com:61616");
            
            connection = connectionFactory.createConnection();
            connection.start();

            // Create the session
            Session session = connection.createSession(transacted,
Session.AUTO_ACKNOWLEDGE);
            if (topic) {
                destination = session.createTopic(subject);
            } else {
                destination = session.createQueue(subject);
            }

            // Create the producer.
            MessageProducer producer = session.createProducer(destination);
            if (persistent) {
                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            } else {
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            }
            if (timeToLive != 0) {
                producer.setTimeToLive(timeToLive);
            }

            // Start sending messages
            sendLoop(session, producer);

            System.out.println("Done.");

            // Use the ActiveMQConnection interface to dump the connection
            // stats.
            ActiveMQConnection c = (ActiveMQConnection)connection;
            c.getConnectionStats().dump(new IndentPrinter());

        } catch (Exception e) {
            System.out.println("Caught: " + e);
            e.printStackTrace();
        } finally {
            try {
                connection.close();
            } catch (Throwable ignore) {
            }
        }
    }
    protected void sendLoop(Session session, MessageProducer producer)
throws Exception {

        //for (int i = 0; i < messageCount || messageCount == 0; i++) {
       try{
    	   System.out.println("in send loop");
          

            /*construct a default soap MessageFactory */
            MessageFactory mf = MessageFactory.newInstance();
            
            /* Create a SOAP message object.*/
            SOAPMessage soapMessage = mf.createMessage();
            
          // Test with objects
            
            Person p = new Person();
            Address a = new Address();
            Address b = new Address();
            p.setLname("Jayasree");
            p.setMname("");
            p.setLname("Balasubramanian");
            a.setAdline1("infosys");
            a.setAdline2("");
            a.setCity("mangalore");
            a.setCountry("india");
            a.setState("");
            p.setAddress(a);
            


          
           
               DocumentBuilderFactory factory =
        	   DocumentBuilderFactory.newInstance();
        	   factory.setNamespaceAware(true);
        
        	   DocumentBuilder builder =
        	   factory.newDocumentBuilder();
        	   Document document = builder.parse( new
File("C:/Address_Std.xml") );
       
//        	    document can be any XML document
        	   SOAPBody soapBody = soapMessage.getSOAPBody();
        	   SOAPBodyElement docElement =  soapBody.addDocument(document);
        	   
        	   
        	   
        	          	   
        	          	   
        	 //Create an attachment with the Java Framework Activation API
        	    URL url = new URL("http://java.sun.com/webservices/");
        	    DataHandler dh = new DataHandler (url);
        	    AttachmentPart ap = soapMessage.createAttachmentPart(dh);

        	    //Set content type and ID
        	    ap.setContentType("text/html");
        	    ap.setContentId("cid-001");

        	    //Add attachment to the SOAP message
        	    soapMessage.addAttachmentPart(ap);
        	    soapMessage.saveChanges();

        	        	  

    // add attachment to message      
        	   
           soapMessage.saveChanges();
           
           soapMessage.writeTo(System.out);
           System.out.println("inside producer----att
count---"+soapMessage.countAttachments());
            Message m =
MessageTransformer.SOAPMessageIntoJMSMessage(soapMessage, session );
            System.out.println("Display the SOAP message"+m);
            producer.send(m);
            if (transacted) {
                session.commit();
            }

           // Thread.sleep(sleepTime);

       
    }catch(Exception e){
    	System.out.println("exception-->"+e);
    	e.printStackTrace();
    }
    }
    

   
    public void setPersistent(boolean durable) {
        this.persistent = durable;
    }

    public void setMessageCount(int messageCount) {
        this.messageCount = messageCount;
    }

    public void setMessageSize(int messageSize) {
        this.messageSize = messageSize;
    }

    public void setPassword(String pwd) {
        this.password = pwd;
    }

    public void setSleepTime(long sleepTime) {
        this.sleepTime = sleepTime;
    }

    public void setSubject(String subject) {
        this.subject = subject;
    }

    public void setTimeToLive(long timeToLive) {
        this.timeToLive = timeToLive;
    }

    public void setTopic(boolean topic) {
        this.topic = topic;
    }

    public void setQueue(boolean queue) {
        this.topic = !queue;
    }

    public void setTransacted(boolean transacted) {
        this.transacted = transacted;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    public void setUser(String user) {
        this.user = user;
    }

    public void setVerbose(boolean verbose) {
        this.verbose = verbose;
    }
}


Consumer.java

 import java.io.IOException;
import java.util.Arrays;

import javax.activation.DataHandler;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.xml.soap.AttachmentPart;
import javax.xml.soap.MessageFactory;
import javax.xml.soap.Name;
import javax.xml.soap.SOAPBody;
import javax.xml.soap.SOAPBodyElement;
import javax.xml.soap.SOAPException;
import javax.xml.soap.SOAPMessage;
import javax.xml.transform.Source;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.stream.StreamResult;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import com.sun.messaging.xml.MessageTransformer;

public class Consumer implements MessageListener, ExceptionListener {
	private boolean running;

    private Session session;
    private Destination destination;
    private MessageProducer replyProducer;

    private boolean pauseBeforeShutdown;
    private boolean verbose = true;
    private int maxiumMessages;
    private String subject = "TOOL1.DEFAULT";
    private boolean topic;
    private String user = ActiveMQConnection.DEFAULT_USER;
    private String password = ActiveMQConnection.DEFAULT_PASSWORD;
    private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
    private boolean transacted;
    private boolean durable;
    private String clientId;
    private int ackMode = Session.AUTO_ACKNOWLEDGE;
    private String consumerName = "James";
    private long sleepTime;
    private long receiveTimeOut;
    Name bodyName;

    public static void main(String[] args) {
        Consumer consumerTool = new Consumer();
        consumerTool.run();
    }

    public void run() {
        try {
            running = true;

            System.out.println("Connecting to URL: " + url);
            System.out.println("Consuming " + (topic ? "topic" : "queue") +
": " + subject);
            System.out.println("Using a " + (durable ? "durable" :
"non-durable") + " subscription");

            ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(user, password, url);
            Connection connection = connectionFactory.createConnection();
            if (durable && clientId != null && clientId.length() > 0 &&
!"null".equals(clientId)) {
                connection.setClientID(clientId);
            }
            connection.setExceptionListener(this);
            connection.start();

            session = connection.createSession(transacted, ackMode);
            if (topic) {
                destination = session.createTopic(subject);
            } else {
                destination = session.createQueue(subject);
            }

            replyProducer = session.createProducer(null);
            replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

            MessageConsumer consumer = null;
            if (durable && topic) {
                consumer =
session.createDurableSubscriber((Topic)destination, consumerName);
            } else {
                consumer = session.createConsumer(destination);
            }
            System.out.println("before  and  calling the consume method");
            if (maxiumMessages > 0) {
            	 System.out.println("Inside if part");
                consumeMessagesAndClose(connection, session, consumer);
            } else {
                if (receiveTimeOut == 0) {
                	System.out.println("Inside else if part");
                    consumer.setMessageListener(this);
                } else {
                	System.out.println("Inside else part");
                    consumeMessagesAndClose(connection, session, consumer,
receiveTimeOut);
                }
            }

        } catch (Exception e) {
            System.out.println("Caught: " + e);
            e.printStackTrace();
        }
    }

    public void onMessage(Message message) {
        try {

        	    	MessageFactory messageFactory = MessageFactory.newInstance();
            	
            	SOAPMessage soapMessage =
            		  MessageTransformer.SOAPMessageFromJMSMessage(
message,messageFactory );
            	soapMessage.writeTo(System.out); 
            	
 //Extract the content of the reply
  System.out.println("Attachment count in consumer
---->"+soapMessage.countAttachments());
 
 
 

 
 
 java.util.Iterator iterator = soapMessage.getAttachments();
 
 while (iterator.hasNext()) {
	
 	
	 
     DataHandler dh = ((AttachmentPart)iterator.next()).getDataHandler();
    Object obj = dh.getContent();
   //  if(null != fname)
       // return new File(fname);
   
 }
 

 Source sourceContent = soapMessage.getSOAPPart().getContent();
 //Set the output for the transformation
 
 StreamResult result = new StreamResult(System.out);

    	TransformerFactory transformerFactory = 
    TransformerFactory.newInstance();

Transformer transformer = 
 transformerFactory.newTransformer();
 transformer.transform(sourceContent, result);


            if (message.getJMSReplyTo() != null) {
            
                replyProducer.send(message.getJMSReplyTo(),
session.createTextMessage("Reply: " + message.getJMSMessageID()));
            }

            if (transacted) {
            	
                session.commit();
            } else if (ackMode == Session.CLIENT_ACKNOWLEDGE) {
                message.acknowledge();
            }

        } catch (JMSException e) {
           /* System.out.println("Caught JMS Error Code: " +
e.getErrorCode());
            System.out.println("Caught JMS: " + e.getLocalizedMessage());
            System.out.println("Caught JMS: " + e.getMessage());*/
            e.printStackTrace();
        } 
        catch (Exception e) {
        	
             System.out.println("Caught JMS: " + e.getMessage());
        	e.printStackTrace();
        	
        }
        	finally {
        }
            if (sleepTime > 0) {
                try {
                    Thread.sleep(sleepTime);
                } catch (InterruptedException e) {
                }
            }
        }
    

    public synchronized void onException(JMSException ex) {
        System.out.println("JMS Exception occured.  Shutting down client.");
        running = false;
    }

    synchronized boolean isRunning() {
        return running;
    }

    protected void consumeMessagesAndClose(Connection connection, Session
session, MessageConsumer consumer) throws JMSException, IOException {
        System.out.println("We are about to wait until we consume: " +
maxiumMessages + " message(s) then we will shutdown");
        System.out.println("Inside method part");
        for (int i = 0; i < maxiumMessages && isRunning();) {
            Message message = consumer.receive(1000);
            if (message != null) {
                i++;
                onMessage(message);
            }
        }
        System.out.println("Closing connection");
        consumer.close();
        session.close();
        connection.close();
        if (pauseBeforeShutdown) {
            System.out.println("Press return to shut down");
            System.in.read();
        }
    }

    protected void consumeMessagesAndClose(Connection connection, Session
session, MessageConsumer consumer, long timeout) throws JMSException,
IOException {
        System.out.println("We will consume messages while they continue to
be delivered within: " + timeout + " ms, and then we will shutdown");

        Message message;
        while ((message = consumer.receive(timeout)) != null) {
            onMessage(message);
        }

        System.out.println("Closing connection");
        consumer.close();
        session.close();
        connection.close();
        if (pauseBeforeShutdown) {
            System.out.println("Press return to shut down");
            System.in.read();
        }
    }

    public void setAckMode(String ackMode) {
        if ("CLIENT_ACKNOWLEDGE".equals(ackMode)) {
            this.ackMode = Session.CLIENT_ACKNOWLEDGE;
        }
        if ("AUTO_ACKNOWLEDGE".equals(ackMode)) {
            this.ackMode = Session.AUTO_ACKNOWLEDGE;
        }
        if ("DUPS_OK_ACKNOWLEDGE".equals(ackMode)) {
            this.ackMode = Session.DUPS_OK_ACKNOWLEDGE;
        }
        if ("SESSION_TRANSACTED".equals(ackMode)) {
            this.ackMode = Session.SESSION_TRANSACTED;
        }
    }

    public void setClientId(String clientID) {
        this.clientId = clientID;
    }

    public void setConsumerName(String consumerName) {
        this.consumerName = consumerName;
    }

    public void setDurable(boolean durable) {
        this.durable = durable;
    }

    public void setMaxiumMessages(int maxiumMessages) {
        this.maxiumMessages = maxiumMessages;
    }

    public void setPauseBeforeShutdown(boolean pauseBeforeShutdown) {
        this.pauseBeforeShutdown = pauseBeforeShutdown;
    }

    public void setPassword(String pwd) {
        this.password = pwd;
    }

    public void setReceiveTimeOut(long receiveTimeOut) {
        this.receiveTimeOut = receiveTimeOut;
    }

    public void setSleepTime(long sleepTime) {
        this.sleepTime = sleepTime;
    }

    public void setSubject(String subject) {
        this.subject = subject;
    }

    public void setTopic(boolean topic) {
        this.topic = topic;
    }

    public void setQueue(boolean queue) {
        this.topic = !queue;
    }

    public void setTransacted(boolean transacted) {
        this.transacted = transacted;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    public void setUser(String user) {
        this.user = user;
    }

    public void setVerbose(boolean verbose) {
        this.verbose = verbose;
    }

}




-- 
View this message in context: http://www.nabble.com/Problem-in-sending-SOPA-with-Attachment-using-Active-MQ-tp20014726p20014726.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Mime
View raw message