Return-Path: Delivered-To: apmail-activemq-users-archive@www.apache.org Received: (qmail 87110 invoked from network); 29 Aug 2009 16:51:59 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 29 Aug 2009 16:51:59 -0000 Received: (qmail 66645 invoked by uid 500); 29 Aug 2009 16:51:59 -0000 Delivered-To: apmail-activemq-users-archive@activemq.apache.org Received: (qmail 66625 invoked by uid 500); 29 Aug 2009 16:51:58 -0000 Mailing-List: contact users-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: users@activemq.apache.org Delivered-To: mailing list users@activemq.apache.org Received: (qmail 66615 invoked by uid 99); 29 Aug 2009 16:51:58 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 29 Aug 2009 16:51:58 +0000 X-ASF-Spam-Status: No, hits=-0.0 required=10.0 tests=SPF_HELO_PASS,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of lists@nabble.com designates 216.139.236.158 as permitted sender) Received: from [216.139.236.158] (HELO kuber.nabble.com) (216.139.236.158) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 29 Aug 2009 16:51:49 +0000 Received: from isper.nabble.com ([192.168.236.156]) by kuber.nabble.com with esmtp (Exim 4.63) (envelope-from ) id 1MhR9E-0005bx-Ru for users@activemq.apache.org; Sat, 29 Aug 2009 09:51:28 -0700 Message-ID: <25204452.post@talk.nabble.com> Date: Sat, 29 Aug 2009 09:51:28 -0700 (PDT) From: moonbird To: users@activemq.apache.org Subject: Re: ActiveMQ c++ and Java In-Reply-To: <1251468419.19274.8.camel@localhost> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Transfer-Encoding: 7bit X-Nabble-From: moonbird@ymail.com References: <25190274.post@talk.nabble.com> <1251468419.19274.8.camel@localhost> X-Virus-Checked: Checked by ClamAV on apache.org this is my java application: ... // Create connection. Create session from connection; false means // session is not transacted. Create requestor and text message. Send // messages, wait for answer and finally close session and connection. try { queueConnection = queueConnectionFactory.createQueueConnection(); queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); // obviously this is needed ??? why doesnt it work to lookup the activemq queue via jndi ? stationInfoQueue = queueSession.createQueue("stationInfoQueue"); textMessage = queueSession.createTextMessage(myXMLRequestAsString); // javax.jms.QueueRequestor creates a TemporaryQueue for the responses and provides a request method that sends the request message // and waits for its reply.This is a basic request/reply abstraction that should be sufficient for most uses. // JMS providers and clients are free to create more sophisticated versions. queueRequestor = new QueueRequestor(queueSession, stationInfoQueue); //sends the message and waits until respond is received TextMessage answer = (TextMessage) queueRequestor.request(textMessage); System.out.println("CLIENT: Response message received: "); System.out.println(answer.getText()); } catch (JMSException e) { System.out.println("JMSExceptionn occurred:" + e); } ... and this is my c++ application (it is a modified version of the apache example SimpleAsyncConsumer.cpp): /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include using namespace activemq; using namespace activemq::core; using namespace decaf::lang; using namespace decaf::util; using namespace decaf::util::concurrent; using namespace cms; using namespace std; //////////////////////////////////////////////////////////////////////////////// class SimpleAsyncConsumer : public ExceptionListener, public MessageListener { private: Connection* connection; Session* session; Destination* destination; MessageConsumer* consumer; bool useTopic; bool clientAck; std::string brokerURI; std::string destURI; public: SimpleAsyncConsumer( const std::string& brokerURI, const std::string& destURI, bool useTopic = false, bool clientAck = false ) { connection = NULL; session = NULL; destination = NULL; consumer = NULL; this->useTopic = useTopic; this->brokerURI = brokerURI; this->destURI = destURI; this->clientAck = clientAck; } virtual ~SimpleAsyncConsumer(){ cleanup(); } void runConsumer() { try { // Create a ConnectionFactory ActiveMQConnectionFactory* connectionFactory = new ActiveMQConnectionFactory( brokerURI ); // Create a Connection connection = connectionFactory->createConnection(); delete connectionFactory; connection->start(); connection->setExceptionListener(this); // Create a Session if( clientAck ) { session = connection->createSession( Session::CLIENT_ACKNOWLEDGE ); } else { session = connection->createSession( Session::AUTO_ACKNOWLEDGE ); } // Create the destination (Topic or Queue) if( useTopic ) { destination = session->createTopic( destURI ); } else { destination = session->createQueue( destURI ); } // Create a MessageConsumer from the Session to the Topic or Queue consumer = session->createConsumer( destination ); consumer->setMessageListener( this ); } catch (CMSException& e) { e.printStackTrace(); } } // Called from the consumer since this class is a registered MessageListener. virtual void onMessage( const Message* message ){ static int count = 0; try { count++; const TextMessage* textMessage = dynamic_cast< const TextMessage* >( message ); string text = ""; if( textMessage != NULL ) { text = textMessage->getText(); } else { text = "NOT A TEXTMESSAGE!"; } if( clientAck ) { message->acknowledge(); } printf( "Message #%d Received: %s\n", count, text.c_str() ); //sendReply(textMessage); } catch (CMSException& e) { e.printStackTrace(); } sendReply(message); } // If something bad happens you see it here as this class is also been // registered as an ExceptionListener with the connection. virtual void onException( const CMSException& ex AMQCPP_UNUSED) { printf("CMS Exception occurred. Shutting down client.\n"); exit(1); } void sendReply(const Message* msg) { printf( "started sendrReply()"); try { // Create a ConnectionFactory auto_ptr connectionFactory( new ActiveMQConnectionFactory( brokerURI ) ); try { connection = connectionFactory->createConnection(); connection->start(); } catch( CMSException& e ) { e.printStackTrace(); } // Create a Session session = connection->createSession( Session::AUTO_ACKNOWLEDGE ); // Create the destination (Topic or Queue) destination = const_cast(msg->getCMSReplyTo()); // Create a MessageProducer from the Session to the Topic or Queue MessageProducer* producer = session->createProducer( destination ); producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT ); // Create a messages string text = (string)"Hello World back"; TextMessage* message = session->createTextMessage( text ); // Tell the producer to send the message producer->send( message ); delete message; } catch ( CMSException& e ) { e.printStackTrace(); } } private: void cleanup(){ //************************************************* // Always close destination, consumers and producers before // you destroy their sessions and connection. //************************************************* // Destroy resources. try{ if( destination != NULL ) delete destination; }catch (CMSException& e) { e.printStackTrace(); } destination = NULL; try{ if( consumer != NULL ) delete consumer; }catch (CMSException& e) { e.printStackTrace(); } consumer = NULL; // Close open resources. try{ if( session != NULL ) session->close(); if( connection != NULL ) connection->close(); }catch (CMSException& e) { e.printStackTrace(); } // Now Destroy them try{ if( session != NULL ) delete session; }catch (CMSException& e) { e.printStackTrace(); } session = NULL; try{ if( connection != NULL ) delete connection; }catch (CMSException& e) { e.printStackTrace(); } connection = NULL; } }; //////////////////////////////////////////////////////////////////////////////// int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) { activemq::library::ActiveMQCPP::initializeLibrary(); std::cout << "=====================================================\n"; std::cout << "Starting the example:" << std::endl; std::cout << "-----------------------------------------------------\n"; // Set the URI to point to the IPAddress of your broker. // add any optional params to the url to enable things like // tightMarshalling or tcp logging etc. See the CMS web site for // a full list of configuration options. // // http://activemq.apache.org/cms/ // // Wire Format Options: // ===================== // Use either stomp or openwire, the default ports are different for each // // Examples: // tcp://127.0.0.1:61616 default to openwire // tcp://127.0.0.1:61616?wireFormat=openwire same as above // tcp://127.0.0.1:61613?wireFormat=stomp use stomp instead // std::string brokerURI = "failover:(tcp://127.0.0.1:61616" // "?wireFormat=openwire" // "&connection.useAsyncSend=true" // "&transport.commandTracingEnabled=true" // "&transport.tcpTracingEnabled=true" // "&wireFormat.tightEncodingEnabled=true" ")"; //============================================================ // This is the Destination Name and URI options. Use this to // customize where the consumer listens, to have the consumer // use a topic or queue set the 'useTopics' flag. //============================================================ //std::string destURI = "TEST.FOO"; //?consumer.prefetchSize=1"; std::string destURI = "stationInfoQueue"; //?consumer.prefetchSize=1"; //============================================================ // set to true to use topics instead of queues // Note in the code above that this causes createTopic or // createQueue to be used in the consumer. //============================================================ bool useTopics = false; //============================================================ // set to true if you want the consumer to use client ack mode // instead of the default auto ack mode. //============================================================ bool clientAck = false; // Create the consumer SimpleAsyncConsumer consumer( brokerURI, destURI, useTopics, clientAck ); // Start it up and it will listen forever. consumer.runConsumer(); // Wait to exit. std::cout << "Press 'q' to quit" << std::endl; while( std::cin.get() != 'q') {} std::cout << "-----------------------------------------------------\n"; std::cout << "Finished with the example." << std::endl; std::cout << "=====================================================\n"; activemq::library::ActiveMQCPP::shutdownLibrary(); } ...what I also dont understand is why "sendReply(message);" is called AFTER I pressed a key ? ... and then I get this error on the console: ----------------------------------------------------- Finished with the example. ===================================================== simple_async_consumer(1639,0xa0738720) malloc: *** error for object 0x106a98: Non-aligned pointer being freed *** set a breakpoint in malloc_error_break to debug -- View this message in context: http://www.nabble.com/ActiveMQ-c%2B%2B-and-Java-tp25190274p25204452.html Sent from the ActiveMQ - User mailing list archive at Nabble.com.