activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r649192 - in /activemq/trunk/assembly/src/release/example/transactions: ./ README.txt build.xml src/ src/Retailer.java src/Supplier.java src/TransactionsDemo.java src/Vendor.java
Date Thu, 17 Apr 2008 17:20:44 GMT
Author: rajdavies
Date: Thu Apr 17 10:20:34 2008
New Revision: 649192

URL: http://svn.apache.org/viewvc?rev=649192&view=rev
Log:
Apply patch from https://issues.apache.org/activemq/browse/AMQ-1664

Added:
    activemq/trunk/assembly/src/release/example/transactions/
    activemq/trunk/assembly/src/release/example/transactions/README.txt   (with props)
    activemq/trunk/assembly/src/release/example/transactions/build.xml
    activemq/trunk/assembly/src/release/example/transactions/src/
    activemq/trunk/assembly/src/release/example/transactions/src/Retailer.java   (with props)
    activemq/trunk/assembly/src/release/example/transactions/src/Supplier.java   (with props)
    activemq/trunk/assembly/src/release/example/transactions/src/TransactionsDemo.java   (with
props)
    activemq/trunk/assembly/src/release/example/transactions/src/Vendor.java   (with props)

Added: activemq/trunk/assembly/src/release/example/transactions/README.txt
URL: http://svn.apache.org/viewvc/activemq/trunk/assembly/src/release/example/transactions/README.txt?rev=649192&view=auto
==============================================================================
--- activemq/trunk/assembly/src/release/example/transactions/README.txt (added)
+++ activemq/trunk/assembly/src/release/example/transactions/README.txt Thu Apr 17 10:20:34
2008
@@ -0,0 +1,40 @@
+Transactions Demo
+=================
+This example is an ActiveMQ implementation of the "TransactedExample" from
+Sun's JMS Tutorial (http://java.sun.com/products/jms/tutorial/index.html).
+
+The example simulates a simplified eCommerce application with four parts:
+the retailer who places the orders, the vendor who assemples the computers,
+and two suppliers--one for hard drives and another for monitors.
+
+The retailer sends a message to the vendor's queue and awaits a reply.
+The vendor receives the message and sends a message to each of the
+supplier's queues. It does this in a single transaction, and will randomly
+throw an exception simulating a database error, triggering a rollback.
+Each supplier receives the order, checks inventory and replies to the
+message stating how many items were sent.
+The vendor collects both responses and responds to the retailer, notifying
+wheather it cna fulfill the complete order or not.
+The retailer receives the message from the vendor.
+
+Running the Example
+===================
+To run the complete demo in a single JVM, with ActiveMQ running on the local
+computer:
+  ant transactions_demo
+
+If you are running ActiveMQ on a non-standard port, or on a different host,
+you can pass a url on the commandline:
+  ant -Durl=tcp://localhost:61616 transactions_demo
+
+If your ActiveMQ instance is password-protected, you can also pass a
+username and password on the command line:
+  ant -Duser=myusername -Dpassword=supersecret transactions_demo
+
+You can also run the individual components seperately, again with optional
+url and/or authentication parameters:
+  ant retailer &
+  ant vendor &
+  ant hdsupplier &
+  ant monitorsupplier &
+

Propchange: activemq/trunk/assembly/src/release/example/transactions/README.txt
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/assembly/src/release/example/transactions/build.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/assembly/src/release/example/transactions/build.xml?rev=649192&view=auto
==============================================================================
--- activemq/trunk/assembly/src/release/example/transactions/build.xml (added)
+++ activemq/trunk/assembly/src/release/example/transactions/build.xml Thu Apr 17 10:20:34
2008
@@ -0,0 +1,113 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project name="transactions" default="help" basedir=".">
+
+    <property name="class.dir" value="target/classes" />
+    <property name="activemq.home" value="../.." />
+
+    <!-- example program defaults -->
+    <property name="url" value="tcp://localhost:61616" />
+    <target name="help">
+        <echo>
+      For the full demo:
+         ant transactions_demo -Durl=tcp://hostname:1234
+      For the indicual components:
+         ant retailer -Durl=tcp://hostname:1234
+         ant vendor -Durl=tcp://hostname:1234
+         ant hdsupplier -Durl=tcp://hostname:1234
+         ant monitorsupplier -Durl=tcp://hostname:1234
+        </echo>
+    </target>
+
+    <target name="clean">
+        <delete dir="target" quiet="true" />
+        <delete dir="${class.dir}" quiet="true" />
+    </target>
+
+    <target name="init">
+        <mkdir dir="${class.dir}" />
+
+        <path id="javac.classpath">
+            <pathelement path="${class.dir}" />
+            <pathelement path="../conf" />
+            <fileset dir="${activemq.home}/lib">
+                <include name="**/*.jar" />
+            </fileset>
+        </path>
+    </target>
+
+    <target name="compile" depends="init" description="Compile all Java">
+        <javac srcdir="src" destdir="${class.dir}" debug="true">
+            <classpath refid="javac.classpath" />
+        </javac>
+    </target>
+
+    <target name="transactions_demo" depends="compile" description="Runs the full demo">
+        <java classname="TransactionsDemo" fork="yes">
+            <classpath refid="javac.classpath" />
+            <jvmarg value="-server" />
+            <sysproperty key="activemq.home" value="${activemq.home}"/>
+            <arg value="${url}" />
+            <arg value="${user}" />
+            <arg value="${password}" />
+        </java>
+    </target>
+
+    <target name="retailer" depends="compile" description="Runs the retailer">
+        <java classname="Retailer" fork="yes">
+            <classpath refid="javac.classpath" />
+            <jvmarg value="-server" />
+            <sysproperty key="activemq.home" value="${activemq.home}"/>
+            <arg value="${url}" />
+            <arg value="${user}" />
+            <arg value="${password}" />
+        </java>
+    </target>
+    <target name="vendor" depends="compile" description="Runs the vendor">
+        <java classname="Vendor" fork="yes">
+            <classpath refid="javac.classpath" />
+            <jvmarg value="-server" />
+            <sysproperty key="activemq.home" value="${activemq.home}"/>
+            <arg value="${url}" />
+            <arg value="${user}" />
+            <arg value="${password}" />
+        </java>
+    </target>
+    <target name="hdsupplier" depends="compile" description="Runs the Hard Drive Supplier">
+        <java classname="Supplier" fork="yes">
+            <classpath refid="javac.classpath" />
+            <jvmarg value="-server" />
+            <sysproperty key="activemq.home" value="${activemq.home}"/>
+            <arg value="HardDrive" />
+            <arg value="${url}" />
+            <arg value="${user}" />
+            <arg value="${password}" />
+        </java>
+    </target>
+    <target name="monitorsupplier" depends="compile" description="Runs the Monitor Supplier">
+        <java classname="Supplier" fork="yes">
+            <classpath refid="javac.classpath" />
+            <jvmarg value="-server" />
+            <sysproperty key="activemq.home" value="${activemq.home}"/>
+            <arg value="Monitor" />
+            <arg value="${url}" />
+            <arg value="${user}" />
+            <arg value="${password}" />
+        </java>
+    </target>
+</project>

Added: activemq/trunk/assembly/src/release/example/transactions/src/Retailer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/assembly/src/release/example/transactions/src/Retailer.java?rev=649192&view=auto
==============================================================================
--- activemq/trunk/assembly/src/release/example/transactions/src/Retailer.java (added)
+++ activemq/trunk/assembly/src/release/example/transactions/src/Retailer.java Thu Apr 17
10:20:34 2008
@@ -0,0 +1,107 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+import org.apache.activemq.ActiveMQConnectionFactory;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+
+/**
+ * The Retailer orders computers from the Vendor by sending a message via
+ * the VendorOrderQueue. It then syncronously receives the reponse message
+ * and reports if the order was successful or not.
+ */
+public class Retailer implements Runnable {
+	private String url;
+	private String user;
+	private String password;
+	
+	public Retailer(String url, String user, String password) {
+		this.url = url;
+		this.user = user;
+		this.password = password;
+	}
+	
+	public void run() {
+		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password,
url);
+		try {
+			Connection connection = connectionFactory.createConnection();
+			
+			// The Retailer's session is non-trasacted.
+			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+			Destination vendorOrderQueue = session.createQueue("VendorOrderQueue");
+			TemporaryQueue retailerConfirmQueue = session.createTemporaryQueue();
+			
+			MessageProducer producer = session.createProducer(vendorOrderQueue);
+			MessageConsumer replyConsumer = session.createConsumer(retailerConfirmQueue);
+
+			connection.start();
+
+			for (int i = 0; i < 5; i++) {
+				MapMessage message = session.createMapMessage();
+				message.setString("Item", "Computer(s)");
+				int quantity = (int)(Math.random() * 4) + 1;
+				message.setInt("Quantity", quantity);
+				message.setJMSReplyTo(retailerConfirmQueue);
+				producer.send(message);
+				System.out.println("Retailer: Ordered " + quantity + " computers.");
+				
+				MapMessage reply = (MapMessage) replyConsumer.receive();
+				if (reply.getBoolean("OrderAccepted")) {
+					System.out.println("Retailer: Order Filled");
+				} else {
+					System.out.println("Retailer: Order Not Filled");
+				}
+			}
+			
+			// Send a non-MapMessage to signal the end
+			producer.send(session.createMessage());
+			
+			replyConsumer.close();
+			connection.close();
+			
+		} catch (JMSException e) {
+			e.printStackTrace();
+		}
+	}
+	
+	public static void main(String[] args) {
+		String url = "tcp://localhost:61616";
+		String user = null;
+		String password = null;
+		
+		if (args.length >= 1) {
+			url = args[0];
+		}
+		
+		if (args.length >= 2) {
+			user = args[1];
+		}
+
+		if (args.length >= 3) {
+			password = args[2];
+		}
+		
+		Retailer r = new Retailer(url, user, password);
+		
+		new Thread(r, "Retailer").start();
+	}
+}

Propchange: activemq/trunk/assembly/src/release/example/transactions/src/Retailer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/assembly/src/release/example/transactions/src/Supplier.java
URL: http://svn.apache.org/viewvc/activemq/trunk/assembly/src/release/example/transactions/src/Supplier.java?rev=649192&view=auto
==============================================================================
--- activemq/trunk/assembly/src/release/example/transactions/src/Supplier.java (added)
+++ activemq/trunk/assembly/src/release/example/transactions/src/Supplier.java Thu Apr 17
10:20:34 2008
@@ -0,0 +1,135 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+import java.util.Random;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+
+/**
+ * The Supplier synchronously receives the order from the Vendor and
+ * randomly responds with either the number ordered, or some lower
+ * quantity. 
+ */
+public class Supplier implements Runnable {
+	private String url;
+	private String user;
+	private String password;
+	private final String ITEM;
+	private final String QUEUE;
+	
+	public Supplier(String item, String queue, String url, String user, String password) {
+		this.url = url;
+		this.user = user;
+		this.password = password;
+		this.ITEM = item;
+		this.QUEUE = queue;
+	}
+	
+	public void run() {
+		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password,
url);
+		Session session = null;
+		Destination orderQueue;
+		try {
+			Connection connection = connectionFactory.createConnection();
+
+			session = connection.createSession(true, Session.SESSION_TRANSACTED);
+			orderQueue = session.createQueue(QUEUE);
+			MessageConsumer consumer = session.createConsumer(orderQueue);
+			
+			connection.start();
+			
+			while (true) {
+				Message message = consumer.receive();
+				MessageProducer producer = session.createProducer(message.getJMSReplyTo());
+				MapMessage orderMessage;
+				if (message instanceof MapMessage) {
+					orderMessage = (MapMessage) message;
+				} else {
+					// End of Stream
+					producer.send(session.createMessage());
+					session.commit();
+					producer.close();
+					break;
+				}
+				
+				int quantity = orderMessage.getInt("Quantity");
+				System.out.println(ITEM + " Supplier: Vendor ordered " + quantity + " " + orderMessage.getString("Item"));
+				
+				MapMessage outMessage = session.createMapMessage();
+				outMessage.setInt("VendorOrderNumber", orderMessage.getInt("VendorOrderNumber"));
+				outMessage.setString("Item", ITEM);
+				
+				quantity = Math.min(
+						orderMessage.getInt("Quantity"),
+						new Random().nextInt(orderMessage.getInt("Quantity") * 10));
+				outMessage.setInt("Quantity", quantity);
+				
+				producer.send(outMessage);
+				System.out.println(ITEM + " Supplier: Sent " + quantity + " " + ITEM + "(s)");
+				session.commit();
+				System.out.println(ITEM + " Supplier: committed transaction");
+				producer.close();
+			}
+			connection.close();
+		} catch (JMSException e) {
+			e.printStackTrace();
+		}
+	}
+	
+	public static void main(String[] args) {
+		String url = "tcp://localhost:61616";
+		String user = null;
+		String password = null;
+		String item = "HardDrive";
+		
+		if (args.length >= 1) {
+			item = args[0];
+		}
+		String queue;
+		if ("HardDrive".equals(item)) {
+			queue = "StorageOrderQueue";
+		} else if ("Monitor".equals(item)) {
+			queue = "MonitorOrderQueue";
+		} else {
+			throw new IllegalArgumentException("Item must be either HardDrive or Monitor");
+		}
+		
+		if (args.length >= 2) {
+			url = args[1];
+		}
+		
+		if (args.length >= 3) {
+			user = args[2];
+		}
+
+		if (args.length >= 4) {
+			password = args[3];
+		}
+		
+		Supplier s = new Supplier(item, queue, url, user, password);
+		
+		new Thread(s, "Supplier " + item).start();
+	}
+}

Propchange: activemq/trunk/assembly/src/release/example/transactions/src/Supplier.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/assembly/src/release/example/transactions/src/TransactionsDemo.java
URL: http://svn.apache.org/viewvc/activemq/trunk/assembly/src/release/example/transactions/src/TransactionsDemo.java?rev=649192&view=auto
==============================================================================
--- activemq/trunk/assembly/src/release/example/transactions/src/TransactionsDemo.java (added)
+++ activemq/trunk/assembly/src/release/example/transactions/src/TransactionsDemo.java Thu
Apr 17 10:20:34 2008
@@ -0,0 +1,47 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+public class TransactionsDemo {
+
+	public static void main(String[] args) {
+		String url = "tcp://localhost:61616";
+		String user = null;
+		String password = null;
+		
+		if (args.length >= 1) {
+			url = args[0];
+		}
+		
+		if (args.length >= 2) {
+			user = args[1];
+		}
+
+		if (args.length >= 3) {
+			password = args[2];
+		}
+		
+		Retailer r = new Retailer(url, user, password);
+		Vendor v = new Vendor(url, user, password);
+		Supplier s1 = new Supplier("HardDrive", "StorageOrderQueue", url, user, password);
+		Supplier s2 = new Supplier("Monitor", "MonitorOrderQueue", url, user, password);
+		
+		new Thread(r, "Retailer").start();
+		new Thread(v, "Vendor").start();
+		new Thread(s1, "Supplier 1").start();
+		new Thread(s2, "Supplier 2").start();
+	}
+
+}

Propchange: activemq/trunk/assembly/src/release/example/transactions/src/TransactionsDemo.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/assembly/src/release/example/transactions/src/Vendor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/assembly/src/release/example/transactions/src/Vendor.java?rev=649192&view=auto
==============================================================================
--- activemq/trunk/assembly/src/release/example/transactions/src/Vendor.java (added)
+++ activemq/trunk/assembly/src/release/example/transactions/src/Vendor.java Thu Apr 17 10:20:34
2008
@@ -0,0 +1,302 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+
+/**
+ * The Vendor synchronously, and in a single transaction, receives the
+ * order from VendorOrderQueue and sends messages to the two Suppliers via
+ * MonitorOrderQueue and StorageOrderQueue.
+ * The responses are received asynchronously; when both responses come
+ * back, the order confirmation message is sent back to the Retailer.
+ */
+public class Vendor implements Runnable, MessageListener {
+	private String url;
+	private String user;
+	private String password;
+	private	Session asyncSession;
+	private int numSuppliers = 2;
+	private Object supplierLock = new Object();
+	
+	public Vendor(String url, String user, String password) {
+		this.url = url;
+		this.user = user;
+		this.password = password;
+	}
+	
+	public void run() {
+		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password,
url);
+		Session session = null;
+		Destination orderQueue;
+		Destination monitorOrderQueue;
+		Destination storageOrderQueue;
+		TemporaryQueue vendorConfirmQueue;
+		MessageConsumer orderConsumer = null;
+		MessageProducer monitorProducer = null;
+		MessageProducer storageProducer = null;
+
+		try {
+			Connection connection = connectionFactory.createConnection();
+
+			session = connection.createSession(true, Session.SESSION_TRANSACTED);
+			orderQueue = session.createQueue("VendorOrderQueue");
+			monitorOrderQueue = session.createQueue("MonitorOrderQueue");
+			storageOrderQueue = session.createQueue("StorageOrderQueue");
+			
+			orderConsumer = session.createConsumer(orderQueue);
+			monitorProducer = session.createProducer(monitorOrderQueue);
+			storageProducer = session.createProducer(storageOrderQueue);
+			
+			Connection asyncconnection = connectionFactory.createConnection();
+			asyncSession = asyncconnection.createSession(true, Session.SESSION_TRANSACTED);
+			
+			vendorConfirmQueue = asyncSession.createTemporaryQueue();
+			MessageConsumer confirmConsumer = asyncSession.createConsumer(vendorConfirmQueue);
+			confirmConsumer.setMessageListener(this);
+			
+			asyncconnection.start();
+
+			connection.start();
+
+		
+			while (true) {
+				Order order = null;
+				try {
+					Message inMessage = orderConsumer.receive();
+					MapMessage message;
+					if (inMessage instanceof MapMessage) {
+						message = (MapMessage) inMessage;
+						
+					} else {
+						// end of stream
+						Message outMessage = session.createMessage();
+						outMessage.setJMSReplyTo(vendorConfirmQueue);
+						monitorProducer.send(outMessage);
+						storageProducer.send(outMessage);
+						session.commit();
+						break;
+					}
+					
+					// Randomly throw an exception in here to simulate a Database error
+					// and trigger a rollback of the transaction
+					if (new Random().nextInt(3) == 0) {
+						throw new JMSException("Simulated Database Error.");
+					}
+					
+					order = new Order(message);
+					
+					MapMessage orderMessage = session.createMapMessage();
+					orderMessage.setJMSReplyTo(vendorConfirmQueue);
+					orderMessage.setInt("VendorOrderNumber", order.getOrderNumber());
+					int quantity = message.getInt("Quantity");
+					System.out.println("Vendor: Retailer ordered " + quantity + " " + message.getString("Item"));
+					
+					orderMessage.setInt("Quantity", quantity);
+					orderMessage.setString("Item", "Monitor");
+					monitorProducer.send(orderMessage);
+					System.out.println("Vendor: ordered " + quantity + " Monitor(s)");
+					
+					orderMessage.setString("Item", "HardDrive");
+					storageProducer.send(orderMessage);
+					System.out.println("Vendor: ordered " + quantity + " Hard Drive(s)");
+					
+					session.commit();
+					System.out.println("Vendor: Comitted Transaction 1");
+					
+				} catch (JMSException e) {
+					System.out.println("Vendor: JMSException Occured: " + e.getMessage());
+					e.printStackTrace();
+					session.rollback();
+					System.out.println("Vendor: Rolled Back Transaction.");
+				}
+			}
+			
+			synchronized (supplierLock) {
+				while (numSuppliers > 0) {
+					try {
+						supplierLock.wait();
+					} catch (InterruptedException e) {
+						e.printStackTrace();
+					}
+				}
+			}
+			
+			connection.close();
+			asyncconnection.close();
+		
+		} catch (JMSException e) {
+			e.printStackTrace();
+		}
+
+	}
+
+	public void onMessage(Message message) {
+		if (!(message instanceof MapMessage)) {
+			synchronized(supplierLock) {
+				numSuppliers--;
+				supplierLock.notifyAll();
+			}
+			try {
+				asyncSession.commit();
+				return;
+			} catch (JMSException e) {
+				e.printStackTrace();
+			}
+		}
+		
+		int orderNumber = -1;
+		try {
+			MapMessage componentMessage = (MapMessage) message;
+			
+			orderNumber = componentMessage.getInt("VendorOrderNumber");
+			Order order = Order.getOrder(orderNumber);
+			order.processSubOrder(componentMessage);
+			asyncSession.commit();
+			
+			if (! "Pending".equals(order.getStatus())) {
+				System.out.println("Vendor: Completed processing for order " + orderNumber);
+				
+				MessageProducer replyProducer = asyncSession.createProducer(order.getMessage().getJMSReplyTo());
+				MapMessage replyMessage = asyncSession.createMapMessage();
+				if ("Fulfilled".equals(order.getStatus())) {
+					replyMessage.setBoolean("OrderAccepted", true);
+					System.out.println("Vendor: sent " + order.quantity + " computer(s)");
+				} else {
+					replyMessage.setBoolean("OrderAccepted", false);
+					System.out.println("Vendor: unable to send " + order.quantity + " computer(s)");
+				}
+				replyProducer.send(replyMessage);
+				asyncSession.commit();
+				System.out.println("Vender: committed transaction 2");
+			}
+		} catch (JMSException e) {
+			e.printStackTrace();
+		}
+	}
+
+	public static class Order {
+		private static Map<Integer, Order> pendingOrders = new HashMap<Integer, Order>();
+		private static int nextOrderNumber = 1;
+
+		private int orderNumber;
+		private int quantity;
+		private MapMessage monitor = null;
+		private MapMessage storage = null;
+		private MapMessage message;
+		private String status;
+		
+		public Order(MapMessage message) {
+			this.orderNumber = nextOrderNumber++;
+			this.message = message;
+			try {
+				this.quantity = message.getInt("Quantity");
+			} catch (JMSException e) {
+				e.printStackTrace();
+				this.quantity = 0;
+			}
+			status = "Pending";
+			pendingOrders.put(orderNumber, this);
+		}
+		
+		public Object getStatus() {
+			return status;
+		}
+		
+		public int getOrderNumber() {
+			return orderNumber;
+		}
+		
+		public static int getOutstandingOrders() {
+			return pendingOrders.size();
+		}
+		
+		public static Order getOrder(int number) {
+			return pendingOrders.get(number);
+		}
+		
+		public MapMessage getMessage() {
+			return message;
+		}
+		
+		public void processSubOrder(MapMessage message) {
+			String itemName = null;
+			try {
+				itemName = message.getString("Item");
+			} catch (JMSException e) {
+				e.printStackTrace();
+			}
+			
+			if ("Monitor".equals(itemName)) {
+				monitor = message;
+			} else if ("HardDrive".equals(itemName)) {
+				storage = message;
+			}
+			
+			if (null != monitor && null != storage) {
+				// Received both messages
+				try {
+					if (quantity > monitor.getInt("Quantity")) {
+						status = "Cancelled";
+					} else if (quantity > storage.getInt("Quantity")) {
+						status = "Cancelled";
+					} else {
+						status = "Fulfilled";
+					}
+				} catch (JMSException e) {
+					e.printStackTrace();
+					status = "Cancelled";
+				}
+			}
+		}		
+	}
+
+	public static void main(String[] args) {
+		String url = "tcp://localhost:61616";
+		String user = null;
+		String password = null;
+		
+		if (args.length >= 1) {
+			url = args[0];
+		}
+		
+		if (args.length >= 2) {
+			user = args[1];
+		}
+
+		if (args.length >= 3) {
+			password = args[2];
+		}
+		
+		Vendor v = new Vendor(url, user, password);
+		
+		new Thread(v, "Vendor").start();
+	}	
+}

Propchange: activemq/trunk/assembly/src/release/example/transactions/src/Vendor.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message