activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From millerkdm <miller...@yahoo.com>
Subject Re: included: fully functional (almost) quote producer
Date Tue, 24 Oct 2006 13:25:38 GMT

Thanks James! I did not know about the single-producer thing. I will
definitely give it a try.

I have thought about the last-message thing before, and I don't think it
will work, as there are a couple hundred thousand symbols, most of which
will never have a subscriber. So, when a user does subscribe to a new
symbol, there may be no last message to send. And as memory consumption is
definitely an issue with ActiveMQ, I do not want to keep all those quotes
around in memory, anyway.  Also, since some quotes will not update for
hours, or even days, I need to make sure the user always gets the last
quote, and  my best solution, so far, is to just have the client request the
last quote when he subscribes to a symbol. Not elegant, but simple.



James.Strachan wrote:
> 
> Looks cool! :)
> 
> One way you could improve it is just using a single MessageProducer to
> send messages; if you create a JMS MessageProducer passing in null for
> the destination, you can then specify the destination each time you
> send a message. Then you can just keep around a Map of Destinations
> indexed by String - rather than having a full JMS MessageProducer for
> each destination.
> 
> Another thought is - you might want to enable last image subscription
> recovery; so whenever someone subscribes to a quote, they immediately
> get the last image sent to them before any more updates are received.
> 
> http://incubator.apache.org/activemq/subscription-recovery-policy.html
> 
> I wonder if something vaguely like a quote server would be a useful
> utility add-on to distribute with ActiveMQ? I can imagine quite a few
> folks wishing to bridge some kind of quotes/market data to an ActiveMQ
> network
> 
> 
> On 10/20/06, millerkdm <millerkdm@yahoo.com> wrote:
>>
>> If someone would have posted something like this, I would have saved
>> weeks of
>> work.
>>
>> 1.      This is a fully-functional quote publisher  (with  quote objects,
>> db
>> stuff, and a few other things
>> removed).
>> 2.      It has an embedded broker.
>> 3.      When quotes come in, it will publish quotes only if there is a
>> subscriber
>> for a quote.
>> 4.      It will remove publishers when there are no longer clients
>> subscribed to
>> a topic.
>> 5.      It sets memory and prefetch limits.
>> 6.      It evicts messages for slow consumers.
>> 7.      It includes a temporary topic request for the most recent quote,
>> and
>> replies to only the single requesting consumer.
>>
>>
>> Any recommendations on how to improve would be appreciated.
>>
>> enjoy.
>>
>>
>>
>> package server;
>>
>> import java.io.BufferedReader;
>> import java.io.IOException;
>> import java.io.InputStreamReader;
>> import java.io.Serializable;
>> import java.net.DatagramPacket;
>> import java.net.InetAddress;
>> import java.net.MulticastSocket;
>> import java.net.UnknownHostException;
>> import java.text.DateFormat;
>> import java.text.SimpleDateFormat;
>> import java.util.HashMap;
>> import javax.jms.DeliveryMode;
>> import javax.jms.Destination;
>> import javax.jms.JMSException;
>> import javax.jms.Message;
>> import javax.jms.MessageListener;
>> import javax.jms.Topic;
>> import messages.BarsRequestMessage;
>> import messages.QuoteRequestMessage;
>> import messages.TicksRequestMessage;
>> import org.apache.activemq.ActiveMQConnection;
>> import org.apache.activemq.ActiveMQConnectionFactory;
>> import org.apache.activemq.ActiveMQPrefetchPolicy;
>> import org.apache.activemq.ActiveMQSession;
>> import org.apache.activemq.ActiveMQTopicPublisher;
>> import org.apache.activemq.ActiveMQTopicSession;
>> import org.apache.activemq.ActiveMQTopicSubscriber;
>> import org.apache.activemq.advisory.AdvisorySupport;
>> import org.apache.activemq.broker.BrokerService;
>> import
>> org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
>> import
>> org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
>> import org.apache.activemq.broker.region.policy.PolicyEntry;
>> import org.apache.activemq.broker.region.policy.PolicyMap;
>> import org.apache.activemq.command.ActiveMQDestination;
>> import org.apache.activemq.command.ActiveMQMessage;
>> import org.apache.activemq.command.ActiveMQObjectMessage;
>> import org.apache.activemq.command.ActiveMQTopic;
>> import org.apache.activemq.command.ConsumerInfo;
>> import org.apache.activemq.memory.CacheEntryList;
>> import org.apache.activemq.memory.CacheEvictionUsageListener;
>> import org.apache.activemq.memory.UsageManager;
>> import org.apache.log4j.Logger;
>>
>> public class QuoteProducer implements MessageListener
>> {
>>         protected static final DateFormat dateTimeFormat = new
>> SimpleDateFormat("MM/dd/yyyy HH:mm:ss");
>>         protected static final String tcpURL = "tcp://localhost:3000";
>>         protected static final String vmURL = "vm://localhost:4000";
>>         static Logger logger = Logger.getLogger(QuoteProducer.class);
>>         protected ActiveMQConnection connection = null;
>>         protected ActiveMQTopicSession session = null;
>>         protected HashMap<String, ActiveMQTopicPublisher> topicMap;
>>         protected BrokerService broker;
>>         protected InetAddress multiCastReceiveAddress = null;
>>         protected int multiCastReceivePort;
>>         protected MulticastSocket multiCastReceiveSocket = null;
>>         protected QuoteListener feedThread;
>>
>>         public QuoteProducer()
>>         {
>>                 try
>>                 {
>>                        
>> //PropertyConfigurator.configure("log4j.properties");
>>
>>                         topicMap = new HashMap<String,
>> ActiveMQTopicPublisher>();
>>                         broker = createBroker();
>>                         connection = createConnection();
>>                         session = createSession(connection);
>>                         createTopicsListener();
>>                         createTemporaryTopicListener();
>>                         feedThread = new QuoteListener("238.0.0.9",
>> 12344);
>>                 }
>>                 catch (Exception e)
>>                 {
>>                         logger.error(e);
>>                         e.printStackTrace();
>>                 }
>>         }
>>
>>         BrokerService createBroker()
>>         {
>>                 BrokerService b = null;
>>                 try
>>                 {
>>                         b = new BrokerService();
>>                         PolicyEntry policy = new PolicyEntry();
>>                         ConstantPendingMessageLimitStrategy limitStrategy
>> = new
>> ConstantPendingMessageLimitStrategy();
>>                         limitStrategy.setLimit(1000);
>>                        
>> policy.setPendingMessageLimitStrategy(limitStrategy);
>>                         policy.setMessageEvictionStrategy(new
>> OldestMessageEvictionStrategy());
>>                 policy.setSendAdvisoryIfNoConsumers(true);
>>
>>                         PolicyMap pMap = new PolicyMap();
>>                         pMap.setDefaultEntry(policy);
>>
>>                         b .setDestinationPolicy(pMap);
>>                         b.setPersistent(false);
>>                         b .setUseJmx(false);
>>
>>                         UsageManager um = broker.getMemoryManager();
>>                         um.setLimit(1024 * 1024 * 200);
>>                         CacheEvictionUsageListener ceul = new
>> CacheEvictionUsageListener(um, 90,
>> 80, broker.getTaskRunnerFactory());
>>
>>                         CacheEntryList cel = new CacheEntryList();
>>                         ceul.add(cel.createFIFOCacheEvictor());
>>                         b.getMemoryManager().addUsageListener(ceul);
>>                         b.setDeleteAllMessagesOnStartup(true);
>>                         b.addConnector(tcpURL);
>>                         b.addConnector(vmURL);
>>                         b.start();
>>                 }
>>                 catch (Exception e)
>>                 {
>>                         logger.error(e);
>>                         e.printStackTrace();
>>                 }
>>
>>                 return b;
>>         }
>>
>>         protected void publishToTopic(String topic, ActiveMQObjectMessage
>> message)
>>         {
>>                 try
>>                 {
>>                         ActiveMQTopicPublisher publisher =
>> topicMap.get(topic);
>>                         if (publisher == null)
>>                                 return;
>>                         publisher.publish(message);
>>                 }
>>                 catch (JMSException e)
>>                 {
>>                         logger.error(e);
>>                         e.printStackTrace();
>>                 }
>>         }
>>
>>         protected boolean subscribersForTopic(String topic)
>>         {
>>                 return (topicMap.containsKey(topic));
>>         }
>>
>>         protected void addSubscription(String topic)
>>         {
>>                 if (topic.startsWith("TEMP") || topic.startsWith("ID")||
>> subscribersForTopic(topic))
>>                         return;
>>
>>                 synchronized (topicMap)
>>                 {
>>                         ActiveMQTopicPublisher publisher = null;
>>                         try
>>                         {
>>                                 publisher = createPublisher(session,
>> topic);
>>                                 topicMap.put(topic, publisher);
>>                                 System.out.println("added topic: " +
>> topic);
>>                         }
>>                         catch (JMSException e)
>>                         {
>>                                 logger.error(e.getStackTrace());
>>                         }
>>                 }
>>         }
>>
>>         protected void removeSubscription(String topic)
>>         {
>>                 synchronized (topicMap)
>>                 {
>>                         if (!subscribersForTopic(topic))
>>                                 return;
>>
>>                         ActiveMQTopicPublisher publisher =
>> topicMap.remove(topic);
>>                         if (publisher != null)
>>                         {
>>                                 try
>>                                 {
>>                                         publisher.close();
>>                                         System.out.println("removed
>> topic: " + topic);
>>                                 }
>>                                 catch (JMSException e)
>>                                 {
>>                                         logger.error(e.getStackTrace());
>>                                 }
>>                         }
>>                 }
>>         }
>>
>>         protected ActiveMQConnection createConnection() throws
>> JMSException,
>> Exception
>>         {
>>                 ActiveMQConnection c = null;
>>                 try
>>                 {
>>                         ActiveMQPrefetchPolicy prefetchPolicy = new
>> ActiveMQPrefetchPolicy();
>>                         prefetchPolicy.setMaximumPendingMessageLimit(1);
>>                         prefetchPolicy.setTopicPrefetch(1);
>>                         prefetchPolicy.setMaximumPendingMessageLimit(10);
>>
>>                         ActiveMQConnection con = null;
>>                         ActiveMQConnectionFactory connectionFactory = new
>> ActiveMQConnectionFactory("Data Server",
>>                                        
>> AuthenticatingBroker.serverPassword, vmURL);
>>                         connectionFactory.setAlwaysSessionAsync(true);
>>                         connectionFactory.setUseAsyncSend(true);
>>                         connectionFactory.setOptimizeAcknowledge(true);
>>                        
>> connectionFactory.setDisableTimeStampsByDefault(true);
>>                         connectionFactory.setCopyMessageOnSend(false);
>>                         connectionFactory.setUseCompression(true);
>>                        
>> connectionFactory.setPrefetchPolicy(prefetchPolicy);
>>                         con = (ActiveMQConnection)
>> connectionFactory.createConnection();
>>
>>                         con.start();
>>                         return con;
>>                 }
>>                 catch (Exception e)
>>                 {
>>                         logger.error(e);
>>                         e.printStackTrace();
>>                 }
>>                 return c;
>>         }
>>
>>         protected ActiveMQTopicSession createSession(ActiveMQConnection
>> connection)
>> throws Exception
>>         {
>>                 ActiveMQTopicSession ses = (ActiveMQTopicSession)
>> connection.createTopicSession(false,
>>                                 ActiveMQSession.AUTO_ACKNOWLEDGE);
>>                 return ses;
>>         }
>>
>>         protected ActiveMQTopicPublisher
>> createPublisher(ActiveMQTopicSession
>> session, String msgTopic) throws JMSException
>>         {
>>                 ActiveMQTopic destination = (ActiveMQTopic)
>> session.createTopic(msgTopic);
>>                 ActiveMQTopicPublisher publisher =
>> (ActiveMQTopicPublisher)
>> session.createPublisher(destination);
>>                 publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
>>                 publisher.setDisableMessageID(true);
>>                 publisher.setDisableMessageTimestamp(true);
>>                 return publisher;
>>         }
>>
>>         protected void createTemporaryTopicListener()
>>         {
>>                 try
>>                 {
>>                         String tempTopic = "TEMP MESSAGE";
>>                         ActiveMQTopic topic = (ActiveMQTopic)
>> session.createTopic(tempTopic);
>>
>>                         ActiveMQTopicSubscriber tempSubscriber =
>> (ActiveMQTopicSubscriber)
>> session
>>                                 .createSubscriber(topic);
>>                         tempSubscriber.setMessageListener(this);
>>
>>                 }
>>                 catch (JMSException e)
>>                 {
>>                         logger.error(e);
>>                         e.printStackTrace();
>>                 }
>>         }
>>         protected void createTopicsListener()
>>         {
>>                 try
>>                 {
>>                         String msgTopic = ">"; // listen for advisories
>> on all topics
>>                         ActiveMQTopic allTopics = (ActiveMQTopic)
>> session.createTopic(msgTopic);
>>
>>                         ActiveMQTopic noConsumerTopic =
>> AdvisorySupport.getNoTopicConsumersAdvisoryTopic(allTopics);
>>                         ActiveMQTopicSubscriber noConsumerSubscriber =
>> (ActiveMQTopicSubscriber)
>> session
>>                                        
>> .createSubscriber(noConsumerTopic);
>>                         noConsumerSubscriber.setMessageListener(this);
>>
>>                         ActiveMQTopic consumerTopic =
>> AdvisorySupport.getConsumerAdvisoryTopic(allTopics);
>>                         ActiveMQTopicSubscriber consumerSubscriber =
>> (ActiveMQTopicSubscriber)
>> session
>>                                         .createSubscriber(consumerTopic);
>>                         consumerSubscriber.setMessageListener(this);
>>
>>                         /*
>>                         ActiveMQTopic producerTopic =
>> AdvisorySupport.getProducerAdvisoryTopic(allTopics);
>>                         ActiveMQTopicSubscriber producerSubscriber =
>> (ActiveMQTopicSubscriber)
>> session
>>                                         .createSubscriber(producerTopic);
>>                         producerSubscriber.setMessageListener(this);
>>
>>
>>                         ActiveMQTopic connectionTopic =
>> AdvisorySupport.getConnectionAdvisoryTopic();
>>                         ActiveMQTopicSubscriber connectionSubscriber =
>> (ActiveMQTopicSubscriber)
>> session
>>                                        
>> .createSubscriber(connectionTopic);
>>                         connectionSubscriber.setMessageListener(this);
>>                         ActiveMQTopic destinationTopic =
>> AdvisorySupport.getDestinationAdvisoryTopic(allTopics);
>>                         ActiveMQTopicSubscriber destinationSubscriber =
>> (ActiveMQTopicSubscriber)
>> session
>>                                        
>> .createSubscriber(destinationTopic);
>>                         destinationSubscriber.setMessageListener(this);
>>                         */
>>                 }
>>                 catch (JMSException e)
>>                 {
>>                         logger.error(e);
>>                         e.printStackTrace();
>>                 }
>>         }
>>
>>         public static boolean
>> isNoConsumerAdvisoryTopic(ActiveMQDestination
>> destination)
>>         {
>>                 if (destination.isComposite())
>>                 {
>>                         ActiveMQDestination[] compositeDestinations =
>> destination.getCompositeDestinations();
>>                         for (int i = 0; i < compositeDestinations.length;
>> i++)
>>                         {
>>                                 if
>> (isNoConsumerAdvisoryTopic(compositeDestinations[i]))
>>                                 {
>>                                         return true;
>>                                 }
>>                         }
>>                         return false;
>>                 }
>>                 else
>>                 {
>>                         return destination.isTopic()
>>                                         &&
>> destination.getPhysicalName().startsWith(AdvisorySupport.NO_TOPIC_CONSUMERS_TOPIC_PREFIX);
>>                 }
>>         }
>>
>>         public static ActiveMQTopic
>> getNoConsumerAdvisoryTopic(ActiveMQDestination
>> destination)
>>         {
>>                 return new
>> ActiveMQTopic(AdvisorySupport.NO_TOPIC_CONSUMERS_TOPIC_PREFIX +
>> destination.getPhysicalName());
>>         }
>>
>>         public static String getNoConsumerAdvisoryTopic(String topic)
>>         {
>>                 String ret =
>> topic.substring(AdvisorySupport.NO_TOPIC_CONSUMERS_TOPIC_PREFIX.length());
>>                 return ret;
>>         }
>>         public static String getConsumerAdvisoryTopic(String topic)
>>         {
>>                 String ret =
>> topic.substring(AdvisorySupport.TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX.length());
>>                 return ret;
>>         }
>>
>>         public void onMessage(Message message)
>>         {
>>                 ActiveMQMessage activeMessage = (ActiveMQMessage)
>> message;
>>                 ActiveMQTopic destination = (ActiveMQTopic)
>> activeMessage.getDestination();
>>                 System.out.println(destination);
>>
>>                 if (AdvisorySupport.isAdvisoryTopic(destination))
>>                 {
>>                         if
>> (AdvisorySupport.isConsumerAdvisoryTopic(destination))
>>                         {
>>                                 Object command =
>> activeMessage.getDataStructure();
>>                                 if (command != null)
>>                                 {
>>                                         if (command instanceof
>> ConsumerInfo)
>>                                         {
>>                                                 String topic =
>> getConsumerAdvisoryTopic(destination.getPhysicalName());
>>                                                 addSubscription(topic);
>>                                         }
>>                                 }
>>                         }
>>                         else if (isNoConsumerAdvisoryTopic(destination))
>>                         {
>>                                 String topic =
>> getNoConsumerAdvisoryTopic(destination.getPhysicalName());
>>                                 removeSubscription(topic);
>>                         }
>>                 }
>>                 else if (activeMessage instanceof ActiveMQObjectMessage)
>>                 { //for out temorary request topics
>>                         try
>>                         {
>>                                 if (requestMessage.getObject() instanceof
>> QuoteRequestMessage)
>>                                 {
>>                                         new
>> QuoteMessageRequester(requestMessage);
>>                                 }
>>                         }
>>                         catch (JMSException e)
>>                         {
>>                                 logger.error(e);
>>                                 e.printStackTrace();
>>                                 return;
>>                         }
>>                 }
>>         }
>>
>>         class QuoteListener extends Thread
>>         {
>>                 InetAddress multiCastReceiveAddress;
>>                 int multiCastReceivePort;
>>                 MulticastSocket multiCastReceiveSocket;
>>                 DatagramPacket packet;
>>
>>                 public QuoteListener(String receiveAddress, int
>> receivePort)
>>                 {
>>                         try
>>                         {
>>                                 this.multiCastReceiveAddress =
>> InetAddress.getByName(receiveAddress);
>>                         }
>>                         catch (UnknownHostException e)
>>                         {
>>                                 logger.error(e);
>>                                 e.printStackTrace();
>>                         }
>>                         this.multiCastReceivePort = receivePort;
>>                         try
>>                         {
>>                                 multiCastReceiveSocket = new
>> MulticastSocket(multiCastReceivePort);
>>                                 multiCastReceiveSocket.setTimeToLive(5);
>>                                
>> multiCastReceiveSocket.joinGroup(multiCastReceiveAddress);
>>                         }
>>                         catch (Exception e)
>>                         {
>>                                 logger.error(e);
>>                                 e.printStackTrace();
>>                         }
>>                         byte[] message = new byte[1000];
>>                         packet = new DatagramPacket(message,
>> message.length);
>>                         start();
>>                 }
>>
>>                 public void run()
>>                 {
>>                         try
>>                         {
>>                                 while (true)
>>                                 {
>>                                        
>> multiCastReceiveSocket.receive(packet);
>>                                         //long start =
>> System.currentTimeMillis();
>>                                         processPacket();
>>                                         //long end =
>> System.currentTimeMillis() - start;
>>                                         //System.out.println("End of wait
>> for " + end + " millis");
>>                                 }
>>                         }
>>                         catch (Exception e)
>>                         {
>>                                 e.printStackTrace();
>>                         }
>>                 }
>>
>>                 public void processPacket()
>>                 {
>>                         try
>>                         {
>>                                 String topic = "TEST.QUOTE";
>>                                 if (!subscribersForTopic(topic))
>>                                         return;
>>
>>                                 String pretendObject= new String(topic +
>> " " + "stuff");
>>                                 ActiveMQObjectMessage objectMessage =
>> (ActiveMQObjectMessage)
>>                                                
>> session.createObjectMessage();
>>                                 objectMessage.setObject(pretendObject);
>>                                 if (topic.length() > 0 && objectMessage
>> != null)
>>                                                        
>> publishToTopic(topic, objectMessage);
>>                         }
>>                         catch (Exception e)
>>                         {
>>                                 System.out.println(e.getStackTrace());
>>                         }
>>                 }
>>         }
>>
>>         class QuoteMessageRequester extends Thread
>>         {
>>                 ActiveMQObjectMessage requestMessage;
>>                 QuoteMessageRequester(ActiveMQObjectMessage
>> requestMessage)
>>                 {
>>                         this.requestMessage = requestMessage;
>>                         start();
>>                 }
>>                 public void run()
>>                 {
>>                         try
>>                         {
>>                                 String pretendObject= new String("Quote 
>> " + "stuff");
>>                                 ActiveMQObjectMessage objectMessage =
>> (ActiveMQObjectMessage)
>>                                                
>> session.createObjectMessage();
>>                                 objectMessage.setObject(pretendObject);
>>
>>                                 sendReply(requestMessage, pretendObject);
>>                         }
>>                         catch (Exception e)
>>                         {
>>                                 logger.error(e);
>>                                 e.printStackTrace();
>>                                 return;
>>                         }
>>                 }
>>         }
>>
>>
>>         public synchronized void sendReply(Message in, Serializable out)
>>         {
>>                 try
>>                 {
>>                         ActiveMQConnection con = createConnection();
>>                         ActiveMQTopicSession ses = createSession(con);
>>                         ActiveMQObjectMessage replyMessage =
>> (ActiveMQObjectMessage)
>> ses.createObjectMessage(out);
>>                        
>> replyMessage.setJMSCorrelationID(in.getJMSMessageID());
>>                         Destination replyDestination =
>> in.getJMSReplyTo();
>>                         ActiveMQTopicPublisher pub =
>> (ActiveMQTopicPublisher)
>> ses.createPublisher((Topic) replyDestination);
>>                         pub.publish(replyMessage);
>>                         pub.close();
>>                         ses.close();
>>                 }
>>                 catch (Exception e)
>>                 {
>>                         logger.error(e);
>>                         e.printStackTrace();
>>                         return;
>>                 }
>>         }
>>         public static void main(String[] args)
>>         {
>>                 BufferedReader br = new BufferedReader(new
>> InputStreamReader(System.in));
>>                 String str;
>>                 System.out.println("Enter 'quit' to exit\n");
>>                 do
>>                 {
>>                         str = "";
>>                         try
>>                         {
>>                                 str = br.readLine().toUpperCase();
>>                         }
>>                         catch (IOException e)
>>                         {
>>                                 e.printStackTrace();
>>                         }
>>                 }
>>                 while (!str.equals("quit"));
>>                 System.exit(0);
>>         }
>> }
>>
>> --
>> View this message in context:
>> http://www.nabble.com/included%3A-fully-functional-%28almost%29-quote-producer-tf2482597.html#a6922824
>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>
>>
> 
> 
> -- 
> 
> James
> -------
> http://radio.weblogs.com/0112098/
> 
> 

-- 
View this message in context: http://www.nabble.com/included%3A-fully-functional-%28almost%29-quote-producer-tf2482597.html#a6972682
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Mime
View raw message