Return-Path: Delivered-To: apmail-geronimo-activemq-commits-archive@www.apache.org Received: (qmail 50887 invoked from network); 31 Jul 2006 09:38:13 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 31 Jul 2006 09:38:13 -0000 Received: (qmail 62411 invoked by uid 500); 31 Jul 2006 09:38:13 -0000 Delivered-To: apmail-geronimo-activemq-commits-archive@geronimo.apache.org Received: (qmail 62371 invoked by uid 500); 31 Jul 2006 09:38:13 -0000 Mailing-List: contact activemq-commits-help@geronimo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: activemq-dev@geronimo.apache.org Delivered-To: mailing list activemq-commits@geronimo.apache.org Received: (qmail 62358 invoked by uid 99); 31 Jul 2006 09:38:13 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 31 Jul 2006 02:38:13 -0700 X-ASF-Spam-Status: No, hits=-9.4 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received-SPF: pass (asf.osuosl.org: local policy) Received: from [140.211.166.113] (HELO eris.apache.org) (140.211.166.113) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 31 Jul 2006 02:38:06 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id 750E61A9823; Mon, 31 Jul 2006 02:37:46 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r427057 [4/22] - in /incubator/activemq/trunk/amazon: ./ amq_brokersession/ amq_corelib/ amq_examples/ amq_examples/bs_async_recv/ amq_examples/bs_send/ amq_examples/bs_sync_recv/ amq_examples/cl_send/ amq_transport/ command/ marshal/ Date: Mon, 31 Jul 2006 09:36:51 -0000 To: activemq-commits@geronimo.apache.org From: jstrachan@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20060731093746.750E61A9823@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Added: incubator/activemq/trunk/amazon/amq_corelib/CoreLibImpl.cpp URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/amazon/amq_corelib/CoreLibImpl.cpp?rev=427057&view=auto ============================================================================== --- incubator/activemq/trunk/amazon/amq_corelib/CoreLibImpl.cpp (added) +++ incubator/activemq/trunk/amazon/amq_corelib/CoreLibImpl.cpp Mon Jul 31 02:36:40 2006 @@ -0,0 +1,780 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +*/ + +#include +#include + +#include "RCSID.h" +#include +#include +#include +#include +#include +#include + +#include "TextMessage.h" +#include "BytesMessage.h" + +#include "PrimitiveMap.h" +#include "MessageConsumer.h" +#include "NonBlockingMessageConsumer.h" +#include "BlockingMessageConsumer.h" +#include "NonBlockingMessageConsumerRef.h" +#include "BlockingMessageConsumerRef.h" +#include "Exception.h" +#include "Destination.h" + +#include "CoreLibImpl_.h" + +#include "StompMessage.h" +#include "NullLogger.h" + +#include "command/CommandTypes.h" +#include "command/ConnectionInfo.h" +#include "command/SessionInfo.h" +#include "command/ConsumerInfo.h" +#include "command/WireFormatInfo.h" +#include "command/ProducerInfo.h" +#include "command/ShutdownInfo.h" +#include "command/ActiveMQTopic.h" +#include "command/ActiveMQTempTopic.h" +#include "command/ActiveMQTempQueue.h" +#include "command/ActiveMQQueue.h" +#include "command/ActiveMQTextMessage.h" +#include "command/ActiveMQBytesMessage.h" +#include "command/ActiveMQDestination.h" +#include "command/MessageDispatch.h" +#include "command/Response.h" +#include "command/BrokerInfo.h" +#include "command/RemoveInfo.h" +#include "command/DestinationInfo.h" +#include "command/MessageAck.h" +#include "marshal/BaseDataStreamMarshaller.h" +#include "marshal/ProtocolFormat.h" +#include "marshal/MarshallerFactory.h" +#include "marshal/BufferWriter.h" +#include "marshal/BufferReader.h" + +using namespace ActiveMQ; +using namespace std; +using boost::shared_ptr; + +RCSID(CoreLibImpl, "$Id$"); + +#define FATAL(msg) \ + do { if (logger_.get() && logger_->isEnabled(LogLevel::Fatal)) \ + logger_->logFatal(msg); } while(0) + +#define ERROR(msg) \ + do { if (logger_.get() && logger_->isEnabled(LogLevel::Error)) \ + logger_->logError(msg); } while(0) + +#define WARNING(msg) \ + do { if (logger_.get() && logger_->isEnabled(LogLevel::Warning)) \ + logger_->logWarning(msg); } while(0) + +#define INFORM(msg) \ + do { if (logger_.get() && logger_->isEnabled(LogLevel::Inform)) \ + logger_->logInform(msg); } while(0) + +#define DEBUG(msg) \ + do { if (logger_.get() && logger_->isEnabled(LogLevel::Debug)) \ + logger_->logDebug(msg); } while(0) + +const static int OPENWIRE_VERSION = 1; +const static int PREFETCH_SIZE = 32766; +const static int SESSION_ID = 12345; + +const static int STANDARD_ACK = 2; + +const static int ADD_DEST = 0; +const static int REMOVE_DEST = 1; + +const static UUIDGenerator uuidgen; + +const static string activemq = "ActiveMQ"; + +CoreLibImpl::CoreLibImpl(CoreLib *parent, + const string& user, + const string& password) : + parent_(parent), + user_(user), + password_(password), + sizeBufPos_(0), + inMessage_(false), + yetToRecv_(0), + sessionId_(SESSION_ID), + nextCommandId_(0), + nextProducerId_(0), + nextConsumerId_(0), + nextTempDestId_(0), + logger_(new NullLogger()), + initialized_(false) { + + Marshalling::MarshallerFactory::configure(pf_); + clientId_ = uuidgen.getGuid(); + connectionId_ = uuidgen.getGuid(); +} + +void +CoreLibImpl::marshalPending_(Buffer& b) { + if (!(pendingBuffer_.empty())) { + b.insert(b.end(), pendingBuffer_.begin(), pendingBuffer_.end()); + pendingBuffer_.resize(0); + } +} + +void +CoreLibImpl::marshalCommand_(Command::BaseCommand& o, Buffer& b) { + o.setCommandId(nextCommandId_++); + marshalCommand_(static_cast(o), b); +} + +void loosemarshalcommand(Command::WireFormatInfo& wfi, Buffer& b) { + size_t length = htonl(100); + int openwire_version = htonl(OPENWIRE_VERSION); + // size + //b.insert(b.end(), (uint8_t *)(&length), (uint8_t *)(&length) + 4); + // type + b.push_back(1); + // magic + b.push_back('A'); + b.push_back('c'); + b.push_back('t'); + b.push_back('i'); + b.push_back('v'); + b.push_back('e'); + b.push_back('M'); + b.push_back('Q'); + // openwire version + b.insert(b.end(), (uint8_t *)(&openwire_version), (uint8_t *)(&openwire_version) + 4); + // marshalled properties + // true it's there + b.push_back(1); + size_t mplen = htonl(wfi.getMarshalledProperties().size()); + // length + b.insert(b.end(), (uint8_t *)(&mplen), (uint8_t *)(&mplen) + 4); + // data + b.insert(b.end(), wfi.getMarshalledProperties().begin(), wfi.getMarshalledProperties().end()); + + length = htonl(1 + 8 + 4 + 1 + 4 + ntohl(mplen)); + b.insert(b.begin(), (uint8_t *)(&length), (uint8_t *)(&length) + 4); + +} + +void +CoreLibImpl::marshalCommand_(Command::AbstractCommand& o, Buffer& b) { + IO::BooleanStream bs; + + // get the marshaller + Marshalling::BaseDataStreamMarshaller* marshaller = pf_.getMarshaller(o.getCommandType()); + if (marshaller == NULL) { + stringstream exmsg("Invalid command type ("); + exmsg << o.getCommandType(); + exmsg << ") passed to marshalCommand"; + throw Exception(exmsg.str()); + } + + // first pass gets the size and writes the flags bitset + uint32_t size = marshaller->marshal1(pf_, o, bs); + size = htonl(size + bs.getMarshalledSize()); + + // first 4 bytes of a message are its size + b.insert(b.end(), (uint8_t *)(&size), (uint8_t *)(&size) + 4); + + // then one byte that is the command type + b.push_back(o.getCommandType()); + + IO::BufferWriter bw(b); + // then the bitset + bs.marshal(bw); + + // then the second marshalling pass which actually writes the data out + marshaller->marshal2(pf_, o, bw, bs); +} + +void +CoreLibImpl::unmarshalCommand_(Command::AbstractCommand& o, const Buffer& b) { + IO::BufferReader br(b); + IO::BooleanStream bs(br); + + Marshalling::BaseDataStreamMarshaller *marshaller = pf_.getMarshaller(o.getCommandType()); + if (marshaller == NULL) { + stringstream exmsg("Invalid command type ("); + exmsg << o.getCommandType(); + exmsg << ") received from broker"; + throw Exception(exmsg.str()); + } + marshaller->unmarshal(pf_, o, br, bs); +} + +void +CoreLibImpl::initialize(Buffer& b) { + // The first step is wire format negotiation - though it's not really - + // see https://issues.apache.org/activemq/browse/AMQ-681 + + // In any case, the first packet sent is a WireFormatInfo packet + + Command::WireFormatInfo wfi; + + // It has a magic string prefix + vector magic(activemq.begin(), activemq.end()); + wfi.setMagic(magic); + + // OpenWire version + wfi.setVersion(OPENWIRE_VERSION); + + // We want "tight" encoding, which is length-prefixed + PrimitiveMap optionMap; + optionMap.putBoolean("TightEncodingEnabled", true); + + Buffer marshalledOptions; + optionMap.marshal(marshalledOptions); + + wfi.setMarshalledProperties(marshalledOptions); + +// marshalCommand_(wfi, b); + loosemarshalcommand(wfi, b); + + // The next command sets up our JMS Connection + + Command::ConnectionInfo cinfo; + cinfo.setUserName(user_); + cinfo.setPassword(password_); + cinfo.setClientId(clientId_); + + shared_ptr cid(new Command::ConnectionId()); + cid->setValue(connectionId_); + cinfo.setConnectionId(cid); + + marshalCommand_(cinfo, b); + + // Now we set up the JMS Session + + Command::SessionInfo sinfo; + + shared_ptr sid(new Command::SessionId()); + sid->setConnectionId(connectionId_); + sid->setValue(sessionId_); + + sinfo.setSessionId(sid); + + marshalCommand_(sinfo, b); + + initialized_ = true; +} + +void +CoreLibImpl::publish(const Message& m, Buffer& b) { + publish(m.getDestination(), m, b); +} + +void +CoreLibImpl::publish(const Destination& d, + const Message& m, + Buffer &b) { + + // Offload any pending data now that we have the opportunity + marshalPending_(b); + + shared_ptr dest(d.createCommandInstance().release()); + + // Create our producer if we haven't already + if (producerId_.get() == NULL) { + Command::ProducerInfo pi; + + shared_ptr pid(new Command::ProducerId()); + pid->setConnectionId(connectionId_); + pid->setSessionId(sessionId_); + pid->setValue(nextProducerId_++); + producerId_ = pid; + + pi.setProducerId(producerId_); + marshalCommand_(pi, b); + } + + auto_ptr message; + + if (m.getType() == Command::Types::ACTIVEMQ_TEXT_MESSAGE) + message.reset(new Command::ActiveMQTextMessage()); + else if (m.getType() == Command::Types::ACTIVEMQ_BYTES_MESSAGE) + message.reset(new Command::ActiveMQBytesMessage()); + + message->setProducerId(producerId_); + message->setDestination(dest); + message->setPersistent(false); + + if (!(m.getReplyTo().getName().empty())) { + shared_ptr replyTo(m.getReplyTo().createCommandInstance().release()); + message->setReplyTo(replyTo); + } + + shared_ptr mid(new Command::MessageId()); + + mid->setProducerId(producerId_); + message->setMessageId(mid); + + Buffer content; + // serialize the message into the buffer + m.marshall(content); + + message->setContent(content); + + marshalCommand_(*message, b); +} + +void +CoreLibImpl::disconnect(Buffer& b) { + marshalPending_(b); + + // remove the sessionid + + Command::RemoveInfo r; + shared_ptr sidptr(new Command::SessionId()); + sidptr->setValue(sessionId_); + sidptr->setConnectionId(connectionId_); + r.setObjectId(sidptr); + marshalCommand_(r, b); + + // remove the connectionid + + Command::RemoveInfo r2; + shared_ptr cidptr(new Command::ConnectionId()); + cidptr->setValue(connectionId_); + r2.setObjectId(cidptr); + marshalCommand_(r2, b); + + // send shutdowninfo + + Command::ShutdownInfo si; + marshalCommand_(si, b); + initialized_ = false; +} + +CoreLibImpl::~CoreLibImpl() { + for (list::iterator i = consumerRefs_.begin(); + i != consumerRefs_.end(); ++i) { + (*i)->invalidate(); + deregisterRef(*i); + } + for (list::iterator i = allRegisteredDestinations_.begin(); + i != allRegisteredDestinations_.end(); ++i) { + const_cast(*i)->invalidate(); + } +} + +void +CoreLibImpl::subscribe(const Destination& d, MessageConsumerRef& q, Buffer& b) { + marshalPending_(b); + + MessageConsumer *mcptr = q.getConsumer(); + + destinationMaps_.insert(pair(d, mcptr)); + + if (consumerIds_.count(d) == 0) { + // make a new consumer id if there isn't one for this destination + shared_ptr cid(new Command::ConsumerId()); + cid->setConnectionId(connectionId_); + cid->setSessionId(sessionId_); + cid->setValue(nextConsumerId_++); + consumerIds_[d] = cid; + } + + Command::ConsumerInfo si; + shared_ptr dest(d.createCommandInstance().release()); + + si.setConsumerId(consumerIds_[d]); + si.setDestination(dest); + si.setPrefetchSize(PREFETCH_SIZE); + si.setDispatchAsync(true); + + marshalCommand_(si, b); +} + +void +CoreLibImpl::unsubscribe(const Destination& d, Buffer& b) { + marshalPending_(b); + + map::iterator i = destinationMaps_.find(d); + if (i == destinationMaps_.end()) + throw Exception("Not subscribed to destination " + d.getName()); + i->second->removeQueued(d); + + if (consumerIds_.count(d) != 0) { + + // Remove the consumer for this destination + + map >::iterator di = consumerIds_.find(d); + shared_ptr ourcid(di->second); + Command::RemoveInfo ri; + ri.setObjectId(ourcid); + + marshalCommand_(ri, b); + + consumerIds_.erase(di); + } +} + +void +CoreLibImpl::handleData(const Buffer& incoming, Buffer& b) { + handleData(&(incoming.operator[](0)), incoming.size(), b); +} + +void +CoreLibImpl::unmarshalBuffer(vector& buf, Buffer& b) { + int type = buf[0]; + + buf.erase(buf.begin()); + + switch (type) { + case Command::Types::BROKER_INFO: { + // BrokerInfo + // Ignore it, nothing to really do with it + } + break; + case Command::Types::SHUTDOWN_INFO: { + // ShutdownInfo, broker is exiting + Command::ShutdownInfo s; + + unmarshalCommand_(s, buf); + + INFORM("Got ShutdownInfo, shutting down"); + disconnect(b); + } + break; + case Command::Types::MESSAGE_DISPATCH: { + // MessageDispatch, new message arriving + Command::MessageDispatch md; + + unmarshalCommand_(md, buf); + + shared_ptr m = md.getMessage(); + if (m.get() != NULL) { + shared_ptr dest = m->getDestination(); + if (dest.get() != NULL) { + + // Ack the message + + Command::MessageAck ack; + + auto_ptr destToAck; + Destination myd; + + switch(dest->getCommandType()) { + case Command::Types::ACTIVEMQ_TOPIC: + destToAck.reset(new Command::ActiveMQTopic()); + myd = Destination(parent_, dest->getPhysicalName(), false /* isTemp */, true /* isTopic */); + break; + case Command::Types::ACTIVEMQ_QUEUE: + destToAck.reset(new Command::ActiveMQQueue()); + myd = Destination(parent_, dest->getPhysicalName(), false /* isTemp */, false /* isTopic */); + break; + case Command::Types::ACTIVEMQ_TEMP_TOPIC: + destToAck.reset(new Command::ActiveMQTempTopic()); + myd = Destination(parent_, dest->getPhysicalName(), true /* isTemp */, true /* isTopic */); + break; + case Command::Types::ACTIVEMQ_TEMP_QUEUE: + destToAck.reset(new Command::ActiveMQTempQueue()); + myd = Destination(parent_, dest->getPhysicalName(), true /* isTemp */, false /* isTopic */); + break; + }; + + destToAck->setPhysicalName(dest->getPhysicalName()); + + shared_ptr cidToAck(consumerIds_[myd]); + + ack.setConsumerId(cidToAck); + ack.setDestination(shared_ptr(destToAck)); + ack.setAckType(STANDARD_ACK); // tells broker to discard the message + ack.setMessageCount(1); + marshalCommand_(ack, b); + + const string& ourdestname(dest->getPhysicalName()); + + map::iterator i = destinationMaps_.find(myd); + if (i == destinationMaps_.end()) + WARNING("No MessageConsumer registered for received message on destination " + ourdestname); + else { + int mtype = m->getCommandType(); + auto_ptr toenqueue; + switch (mtype) { + case Command::Types::ACTIVEMQ_TEXT_MESSAGE: + toenqueue.reset(new TextMessage(m->getContent())); + break; + case Command::Types::ACTIVEMQ_BYTES_MESSAGE: + toenqueue.reset(new BytesMessage(m->getContent())); + break; + default: + WARNING("Unknown/unimplemented message command type"); + return; + }; + + toenqueue->setDestination(myd); + + if (m->getReplyTo() != NULL) { + const bool rtoIsTemp = m->getReplyTo()->getCommandType() == Command::Types::ACTIVEMQ_TEMP_QUEUE || + m->getReplyTo()->getCommandType() == Command::Types::ACTIVEMQ_TEMP_TOPIC; + + const bool rtoIsTopic = m->getReplyTo()->getCommandType() == Command::Types::ACTIVEMQ_TEMP_TOPIC || + m->getReplyTo()->getCommandType() == Command::Types::ACTIVEMQ_TOPIC; + + Destination rto(parent_, m->getReplyTo()->getPhysicalName(), rtoIsTemp, rtoIsTopic); + + toenqueue->setReplyTo(rto); + } + + i->second->enqueue(toenqueue.release()); + } + } + } + } + break; + } +} + +void +CoreLibImpl::handleData(const uint8_t *buf, size_t len, Buffer& b) { + if (!inMessage_) { + uint32_t msgsize = 0; + if (sizeBufPos_ == 0) { + if (len >= 4) { + msgsize = ntohl(*(uint32_t*)(buf)); + buf += 4; + len -= 4; + } + else { + memcpy(sizeBuf_, buf, len); + sizeBufPos_ += len; + return; + } + } + else { + if (len + sizeBufPos_ >= 4) { + memcpy(sizeBuf_ + sizeBufPos_, buf, 4 - sizeBufPos_); + msgsize = ntohl(*(uint32_t*)(sizeBuf_)); + buf += (4 - sizeBufPos_); + len -= (4 - sizeBufPos_); + } + else { + memcpy(sizeBuf_ + sizeBufPos_, buf, len); + sizeBufPos_ += len; + return; + } + } + + if (msgsize == 0) + throw Exception("Zero-length message - corrupt data stream"); + + unmarshalBuffer_.resize(0); + unmarshalBuffer_.reserve(msgsize); + yetToRecv_ = msgsize; + inMessage_ = true; + sizeBufPos_ = 0; + } + + // does this incoming message fill or exceed the buffer? + + if (len == yetToRecv_) { // exact match + inMessage_ = false; + unmarshalBuffer_.insert(unmarshalBuffer_.end(), buf, buf + len); + unmarshalBuffer(unmarshalBuffer_, b); + } + else if (len < yetToRecv_) { // not yet + unmarshalBuffer_.insert(unmarshalBuffer_.end(), buf, buf + len); + yetToRecv_ -= len; + } + else if (len > yetToRecv_) { // too much + inMessage_ = false; + size_t excess = len - yetToRecv_; // number of excess bytes past the current message + size_t thismsg = len - excess; // number of bytes of this buffer that are the current message + unmarshalBuffer_.insert(unmarshalBuffer_.end(), buf, buf + thismsg); + unmarshalBuffer(unmarshalBuffer_, b); + handleData(buf + thismsg, excess, b); + } +} + +BlockingMessageConsumerRef +CoreLibImpl::newBlockingMessageConsumer(void) { + BlockingMessageConsumer *q = new BlockingMessageConsumer(); + consumers_.push_back(q); + refCounts_[q] = 0; + return BlockingMessageConsumerRef(parent_, q); +} + +NonBlockingMessageConsumerRef +CoreLibImpl::newNonBlockingMessageConsumer(void) { + NonBlockingMessageConsumer *q = new NonBlockingMessageConsumer(); + consumers_.push_back(q); + refCounts_[q] = 0; + return NonBlockingMessageConsumerRef(parent_, q); +} + +void +CoreLibImpl::registerRef(MessageConsumerRef *q) { + if (NULL == q) + return; + + MessageConsumer *mq = q->getConsumer(); + consumerRefs_.push_back(q); + ++refCounts_[mq]; +} + +void +CoreLibImpl::deregisterRef(MessageConsumerRef *q) { + + if (NULL == q) + return; + MessageConsumer *mq = q->getConsumer(); + int count = --refCounts_[mq]; + if (count == 0) { + refCounts_.erase(mq); + // delete all references to the consumer + for(map::iterator i = destinationMaps_.begin(); + i != destinationMaps_.end();) { + if (i->second == mq) + destinationMaps_.erase(i++); + else + ++i; + } + consumers_.remove(mq); + // delete the consumer + delete mq; + } + consumerRefs_.remove(q); +} + +void +CoreLibImpl::setLogger(auto_ptr lgr) { + logger_ = lgr; +} + +Logger& +CoreLibImpl::getLogger() { + return *logger_; +} + +void +CoreLibImpl::registerDest(const Destination& d) { + map::iterator i = destRefCounts_.find(d.toString()); + if (i == destRefCounts_.end()) + destRefCounts_.insert(pair(d.toString(), 1)); + else + i->second++; + + allRegisteredDestinations_.push_back(&d); +} + +void +CoreLibImpl::unregisterDest(const Destination& d) { + map::iterator i = destRefCounts_.find(d.toString()); + if (i == destRefCounts_.end()) + return; + + allRegisteredDestinations_.remove(&d); + + if (i->second == 1) { + // This is the last reference to this destination + + map >::iterator cii = consumerIds_.find(d); + // delete the relevant consumer, if there is one + if (cii != consumerIds_.end()) { + shared_ptr ci(cii->second); + Command::RemoveInfo r; + r.setObjectId(ci); + marshalCommand_(r, pendingBuffer_); + + consumerIds_.erase(cii); + } + + // delete the destination, if it's our responsibility + + // we know we created a temporary destination if our connection ID is in it + if (d.isTemporary() && d.getName().find(connectionId_) != string::npos) { + Command::DestinationInfo di; + + shared_ptr cid(new Command::ConnectionId()); + cid->setValue(connectionId_); + shared_ptr dest(d.createCommandInstance().release()); + di.setDestination(dest); + di.setOperationType(REMOVE_DEST); + di.setConnectionId(cid); + + marshalCommand_(di, pendingBuffer_); + } + destRefCounts_.erase(i); + } + else + i->second--; +} + +Destination +CoreLibImpl::createTemporaryTopic() { + stringstream str; + str << connectionId_ << ":" << nextTempDestId_++; + Destination ret(parent_, str.str(), true /* isTemp */, true /* isTopic */); + + Command::DestinationInfo di; + + shared_ptr cid(new Command::ConnectionId()); + cid->setValue(connectionId_); + + shared_ptr dest(new Command::ActiveMQTempTopic()); + dest->setPhysicalName(ret.getName()); + + di.setDestination(dest); + di.setOperationType(ADD_DEST); + di.setConnectionId(cid); + + marshalCommand_(di, pendingBuffer_); + + return ret; +} + +Destination +CoreLibImpl::createTemporaryQueue() { + stringstream str; + str << connectionId_ << ":" << nextTempDestId_++; + Destination ret(parent_, str.str(), true /* isTemp */, false /* isTopic */); + + Command::DestinationInfo di; + + shared_ptr cid(new Command::ConnectionId()); + cid->setValue(connectionId_); + + shared_ptr dest(new Command::ActiveMQTempQueue()); + dest->setPhysicalName(ret.getName()); + + di.setDestination(dest); + di.setOperationType(ADD_DEST); + di.setConnectionId(cid); + + marshalCommand_(di, pendingBuffer_); + + return ret; +} + +Destination +CoreLibImpl::createTopic(const std::string& name) { + return Destination(parent_, name, false /* isTemp */, true /* isTopic */); +} + +Destination +CoreLibImpl::createQueue(const std::string& name) { + return Destination(parent_, name, false /* isTemp */, false /* isTopic */); +} Propchange: incubator/activemq/trunk/amazon/amq_corelib/CoreLibImpl.cpp ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/amazon/amq_corelib/CoreLibImpl.cpp ------------------------------------------------------------------------------ svn:keywords = Date Author Id Revision HeadURL Added: incubator/activemq/trunk/amazon/amq_corelib/CoreLibImpl_.h URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/amazon/amq_corelib/CoreLibImpl_.h?rev=427057&view=auto ============================================================================== --- incubator/activemq/trunk/amazon/amq_corelib/CoreLibImpl_.h (added) +++ incubator/activemq/trunk/amazon/amq_corelib/CoreLibImpl_.h Mon Jul 31 02:36:40 2006 @@ -0,0 +1,150 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +*/ + +#ifndef ACTIVEMQ_CORELIB_IMPL_H +#define ACTIVEMQ_CORELIB_IMPL_H + +#include +#include +#include +#include + +#include + +#include "Destination.h" + +#include "Message.h" + +#include "Buffer.h" +#include "BlockingMessageConsumerRef.h" +#include "NonBlockingMessageConsumerRef.h" +#include "Logger.h" +#include "UUIDGenerator.h" + +#include "marshal/ProtocolFormat.h" + +namespace ActiveMQ { + namespace Command { + class BaseCommand; + class AbstractCommand; + class ProducerId; + class ConsumerId; + }; + + class CoreLib; + /// Private implementation class for CoreLib + class CoreLibImpl { + public: + virtual ~CoreLibImpl(); + private: + friend class CoreLib; + CoreLibImpl(CoreLib *parent, + const std::string& user, + const std::string& password); + void initialize(Buffer& b); + void disconnect(Buffer& b); + void publish(const Message& msg, Buffer& b); + void publish(const Destination& dest, + const Message& msg, + Buffer& b); + void subscribe(const Destination& dest, + MessageConsumerRef& q, + Buffer& b); + void unsubscribe(const Destination& dest, + Buffer& b); + void handleData(const Buffer& incoming, Buffer& b); + void handleData(const uint8_t *buf, size_t len, Buffer& b); + NonBlockingMessageConsumerRef newNonBlockingMessageConsumer(); + BlockingMessageConsumerRef newBlockingMessageConsumer(); + void setLogger(std::auto_ptr lgr); + Logger& getLogger(); + Destination createTemporaryTopic(); + Destination createTemporaryQueue(); + Destination createTopic(const std::string& name); + Destination createQueue(const std::string& name); + void registerDest(const Destination& d); + void unregisterDest(const Destination& d); + + CoreLibImpl(const CoreLibImpl &); + CoreLibImpl& operator=(const CoreLibImpl &); + + CoreLib *parent_; // for passing to new MessageConsumerRef objects and destinations + + const std::string user_; + const std::string password_; + std::map destinationMaps_; + std::list consumers_; + std::list consumerRefs_; + std::map refCounts_; + + // Destinations are reference-counted + std::map destRefCounts_; + std::list allRegisteredDestinations_; + + // State of asynchronous message unmarshalling + std::vector unmarshalBuffer_; + uint8_t sizeBuf_[4]; + uint8_t sizeBufPos_; + + // Operations that don't have a way to pass up outgoing data + // can use the pending buffer. + std::vector pendingBuffer_; + void marshalPending_(Buffer& b); + + // Unmarshals a full buffer containing one message + void unmarshalBuffer(std::vector& buf, Buffer& b); + // internal data used to track message unmarshalling state + bool inMessage_; + size_t yetToRecv_; + + Marshalling::ProtocolFormat pf_; + + // We only keep one connection / session open for the entire time + int64_t sessionId_; + std::string clientId_; + std::string connectionId_; + // Only one JMS producer is used for all outgoing messages + boost::shared_ptr producerId_; + + // subject -> consumer id mapping + std::map > consumerIds_; + + // keep track of ids to use in allocation of new objects + int nextCommandId_; + int nextProducerId_; + int nextConsumerId_; + int nextTempDestId_; + + // Marshal a command into a buffer + void marshalCommand_(Command::AbstractCommand& o, Buffer& b); + void marshalCommand_(Command::BaseCommand& o, Buffer& b); + + // Unmarshal a command from a buffer + void unmarshalCommand_(Command::AbstractCommand& o, const Buffer& b); + + std::auto_ptr logger_; + + void registerRef(MessageConsumerRef *mc); + void deregisterRef(MessageConsumerRef *mc); + + bool initialized_; + }; +}; + +#endif // ACTIVEMQ_CORELIB_IMPL_H Propchange: incubator/activemq/trunk/amazon/amq_corelib/CoreLibImpl_.h ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/amazon/amq_corelib/CoreLibImpl_.h ------------------------------------------------------------------------------ svn:keywords = Date Author Id Revision HeadURL Added: incubator/activemq/trunk/amazon/amq_corelib/Destination.cpp URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/amazon/amq_corelib/Destination.cpp?rev=427057&view=auto ============================================================================== --- incubator/activemq/trunk/amazon/amq_corelib/Destination.cpp (added) +++ incubator/activemq/trunk/amazon/amq_corelib/Destination.cpp Mon Jul 31 02:36:40 2006 @@ -0,0 +1,120 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +*/ + +#include + +#include "Destination.h" +#include "CoreLib.h" + +#include "command/ActiveMQTempTopic.h" +#include "command/ActiveMQTempQueue.h" +#include "command/ActiveMQQueue.h" +#include "command/ActiveMQTopic.h" + +#include +#include + +using namespace ActiveMQ; +using namespace std; + +Destination::Destination() : + name_(""), + cl_(NULL), + temporary_(false), + isTopic_(false) + {} + +Destination::Destination(CoreLib *corelib, const std::string& name, bool isTemp, bool isTopic) : + name_(name), + cl_(corelib), + temporary_(isTemp), + isTopic_(isTopic) { + + stringstream str; + str << name_; + str << ":isTemporary=" << temporary_; + str << ":isTopic=" << isTopic_; + string_.assign(str.str()); + + assert(cl_ != NULL); + cl_->registerDest(*this); +} + +Destination::Destination(const Destination& oth) : + name_(oth.name_), + cl_(oth.cl_), + temporary_(oth.temporary_), + isTopic_(oth.isTopic_), + string_(oth.string_) +{ + if (cl_ != NULL) { + cl_->registerDest(*this); + } +} + +Destination& +Destination::operator=(const Destination& oth) { + if (&oth != this) { + if (cl_ != NULL) { + cl_->unregisterDest(*this); + } + cl_ = oth.cl_; + name_ = oth.name_; + temporary_ = oth.temporary_; + isTopic_ = oth.isTopic_; + string_.assign(oth.string_); + if (cl_ != NULL) { + cl_->registerDest(*this); + } + } + + return *this; +} + +void +Destination::invalidate() { + cl_ = NULL; +} + +Destination::~Destination() { + if (cl_ != NULL) { + cl_->unregisterDest(*this); + } +} + +auto_ptr +Destination::createCommandInstance() const { + auto_ptr ret; + + if (isTemporary()) { + if (isTopic()) + ret.reset(new Command::ActiveMQTempTopic()); + else + ret.reset(new Command::ActiveMQTempQueue()); + } + else { + if (isTopic()) + ret.reset(new Command::ActiveMQTopic()); + else + ret.reset(new Command::ActiveMQQueue()); + } + + ret->setPhysicalName(name_); + return ret; +} Propchange: incubator/activemq/trunk/amazon/amq_corelib/Destination.cpp ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/amazon/amq_corelib/Destination.cpp ------------------------------------------------------------------------------ svn:keywords = Date Author Id Revision HeadURL Added: incubator/activemq/trunk/amazon/amq_corelib/Destination.h URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/amazon/amq_corelib/Destination.h?rev=427057&view=auto ============================================================================== --- incubator/activemq/trunk/amazon/amq_corelib/Destination.h (added) +++ incubator/activemq/trunk/amazon/amq_corelib/Destination.h Mon Jul 31 02:36:40 2006 @@ -0,0 +1,81 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +*/ + +#ifndef ACTIVEMQ_DESTINATION_H +#define ACTIVEMQ_DESTINATION_H + +#include +#include + +#include "command/ActiveMQDestination.h" + +namespace ActiveMQ { + class CoreLib; + class CoreLibImpl; + + /// Message destination + /** + This class holds an ActiveMQ "destination." It can be either a + JMS Topic or Queue, or temporary versions of either of those. + + Destinations are always constructed from the BrokerSession or + CoreLib objects, as there is broker communication involved in + setting up and tearing down destinations. + + @version $Id$ + */ + class Destination { + public: + /// Gets the name of the destination + const std::string& getName() const { return name_; } + + /// Gets a normalized descriptive string of the destination + std::string toString() const { return string_; } + + /// indicates whether or not the destination is temporary + bool isTemporary() const { return temporary_; } + + /// indicates whether or not the destination is a topic + bool isTopic() const { return isTopic_; } + + Destination(); + virtual ~Destination(); + Destination(const Destination& oth); + Destination& operator=(const Destination& oth); + + bool operator<(const Destination& oth) const { return name_ < oth.name_; } + + bool operator==(const Destination& oth) const { return name_ == oth.name_ && temporary_ == oth.temporary_ && isTopic_ == oth.isTopic_; } + + bool operator!=(const Destination& oth) const { return !operator==(oth); } + private: + std::string name_; + CoreLib *cl_; + bool temporary_; + bool isTopic_; + std::string string_; + + friend class CoreLibImpl; + Destination(CoreLib *corelib, const std::string& name, bool isTemp, bool isTopic); + std::auto_ptr createCommandInstance() const; + void invalidate(); + }; +}; + +#endif // ACTIVEMQ_DESTINATION_H Propchange: incubator/activemq/trunk/amazon/amq_corelib/Destination.h ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/amazon/amq_corelib/Destination.h ------------------------------------------------------------------------------ svn:keywords = Date Author Id Revision HeadURL Added: incubator/activemq/trunk/amazon/amq_corelib/Exception.h URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/amazon/amq_corelib/Exception.h?rev=427057&view=auto ============================================================================== --- incubator/activemq/trunk/amazon/amq_corelib/Exception.h (added) +++ incubator/activemq/trunk/amazon/amq_corelib/Exception.h Mon Jul 31 02:36:40 2006 @@ -0,0 +1,50 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +*/ + +#ifndef ACTIVEMQ_EXCEPTION_H +#define ACTIVEMQ_EXCEPTION_H + +#include +#include + +namespace ActiveMQ { + /// Messaging library exception + /** + This class represents an error inside the messaging + library. This could be the result of bad function call + arguments, or a malformed/error message received back from + the broker. + + @version $Id$ + */ + class Exception : public std::exception { + public: + Exception(const std::string& desc) : desc_(desc) {} + Exception(const char *desc) throw() : desc_(desc) {} + Exception(const Exception& oth) : desc_(oth.desc_) {} + Exception& operator=(const Exception& oth) { desc_ = oth.desc_; + return *this; } + const char *what() const throw() {return desc_.c_str();} + virtual ~Exception() throw() {} + private: + std::string desc_; + }; +}; + +#endif // ACTIVEMQ_EXCEPTION_H Propchange: incubator/activemq/trunk/amazon/amq_corelib/Exception.h ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/amazon/amq_corelib/Exception.h ------------------------------------------------------------------------------ svn:keywords = Date Author Id Revision HeadURL Added: incubator/activemq/trunk/amazon/amq_corelib/ExceptionCallback.cpp URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/amazon/amq_corelib/ExceptionCallback.cpp?rev=427057&view=auto ============================================================================== --- incubator/activemq/trunk/amazon/amq_corelib/ExceptionCallback.cpp (added) +++ incubator/activemq/trunk/amazon/amq_corelib/ExceptionCallback.cpp Mon Jul 31 02:36:40 2006 @@ -0,0 +1,41 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +*/ + +#include + +#include "RCSID.h" +#include "ExceptionCallback.h" + +using namespace ActiveMQ; +using namespace std; + +RCSID(ExceptionCallback, "$Id$"); + +void +ExceptionCallback::operator()(const Exception& e) const { + if (pimpl.get()) + (*pimpl)(e); +} + +ExceptionCallback::ExceptionCallback(const ExceptionCallback& oth) { + if (oth.pimpl.get()) + pimpl.reset(oth.pimpl->clone()); + else + pimpl.reset(); +} Propchange: incubator/activemq/trunk/amazon/amq_corelib/ExceptionCallback.cpp ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/amazon/amq_corelib/ExceptionCallback.cpp ------------------------------------------------------------------------------ svn:keywords = Date Author Id Revision HeadURL Added: incubator/activemq/trunk/amazon/amq_corelib/ExceptionCallback.h URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/amazon/amq_corelib/ExceptionCallback.h?rev=427057&view=auto ============================================================================== --- incubator/activemq/trunk/amazon/amq_corelib/ExceptionCallback.h (added) +++ incubator/activemq/trunk/amazon/amq_corelib/ExceptionCallback.h Mon Jul 31 02:36:40 2006 @@ -0,0 +1,70 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +*/ + +#ifndef ACTIVEMQ_EXCEPTIONCALLBACK_H +#define ACTIVEMQ_EXCEPTIONCALLBACK_H + +#include + +#include "Exception.h" + +namespace ActiveMQ { + /// Callback for BrokerSession exceptions. + /** + This class represents a function or function object that will + be called for exceptions raised in the BrokerSessions's + internal event thread. You shouldn't need to construct it + yourself - it will be implicitly constructed from any function + or function object that matches the void (*)(const Exception &) + signature. + + @version $Id$ + */ + class ExceptionCallback { + private: + struct Impl { + virtual void operator()(const Exception& msg) const = 0; + virtual Impl *clone() const = 0; + virtual ~Impl() {} + }; + + template + struct Wrapper : public Impl { + Wrapper(const T& callfunc) : f_(callfunc) {} + void operator()(const Exception& msg) const { f_(msg); } + Impl *clone() const { return new Wrapper(f_); } + virtual ~Wrapper() {} + private: + const T f_; + }; + + std::auto_ptr pimpl; + + public: + ExceptionCallback() : pimpl(NULL) {} + ExceptionCallback(const ExceptionCallback& oth); + + template + ExceptionCallback(T callfunc) : pimpl(new Wrapper(callfunc)) {} + + void operator()(const Exception& msg) const; + }; +}; + +#endif // ACTIVEMQ_EXCEPTIONCALLBACK_H Propchange: incubator/activemq/trunk/amazon/amq_corelib/ExceptionCallback.h ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/amazon/amq_corelib/ExceptionCallback.h ------------------------------------------------------------------------------ svn:keywords = Date Author Id Revision HeadURL Added: incubator/activemq/trunk/amazon/amq_corelib/Lock.cpp URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/amazon/amq_corelib/Lock.cpp?rev=427057&view=auto ============================================================================== --- incubator/activemq/trunk/amazon/amq_corelib/Lock.cpp (added) +++ incubator/activemq/trunk/amazon/amq_corelib/Lock.cpp Mon Jul 31 02:36:40 2006 @@ -0,0 +1,55 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +*/ + +#include +#include + +#include "Lock.h" +#include "Exception.h" +#include "RCSID.h" + +using ActiveMQ::Lock; +using ActiveMQ::Exception; + +RCSID(Lock, "$Id$"); + +Lock::Lock(pthread_mutex_t *m) + : mutex_(m) +{ + if (NULL == m) + throw Exception("NULL mutex passed to Lock"); + int rc = pthread_mutex_lock(m); + if (rc != 0) + throw Exception(errno >= sys_nerr + ? "Unknown error" + : sys_errlist[errno]); + locked_ = true; +} + +void +Lock::unlock() { + if (locked_) { + pthread_mutex_unlock(mutex_); + locked_ = false; + } +} + +Lock::~Lock() { + unlock(); +} Propchange: incubator/activemq/trunk/amazon/amq_corelib/Lock.cpp ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/amazon/amq_corelib/Lock.cpp ------------------------------------------------------------------------------ svn:keywords = Date Author Id Revision HeadURL Added: incubator/activemq/trunk/amazon/amq_corelib/Lock.h URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/amazon/amq_corelib/Lock.h?rev=427057&view=auto ============================================================================== --- incubator/activemq/trunk/amazon/amq_corelib/Lock.h (added) +++ incubator/activemq/trunk/amazon/amq_corelib/Lock.h Mon Jul 31 02:36:40 2006 @@ -0,0 +1,47 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +*/ + +#ifndef ACTIVEMQ_LOCK_H +#define ACTIVEMQ_LOCK_H + +#include + +namespace ActiveMQ { + /// RAII lock class, used internally + /** + This class holds a lock for its lifetime. The constructor + acquires the lock, and the destructor releases it. This is to + allow exception-safe locking (the lock will be released during + stack unwind). + + @version $Id$ + */ + class Lock { + public: + Lock(pthread_mutex_t *m); + /// Explicit unlock for where implicit unlocking is inconvenient. + void unlock(); + ~Lock(); + private: + pthread_mutex_t *mutex_; + bool locked_; + }; +}; + +#endif // ACTIVEMQ_LOCK_H Propchange: incubator/activemq/trunk/amazon/amq_corelib/Lock.h ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/amazon/amq_corelib/Lock.h ------------------------------------------------------------------------------ svn:keywords = Date Author Id Revision HeadURL Added: incubator/activemq/trunk/amazon/amq_corelib/LogLevel.cpp URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/amazon/amq_corelib/LogLevel.cpp?rev=427057&view=auto ============================================================================== --- incubator/activemq/trunk/amazon/amq_corelib/LogLevel.cpp (added) +++ incubator/activemq/trunk/amazon/amq_corelib/LogLevel.cpp Mon Jul 31 02:36:40 2006 @@ -0,0 +1,52 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +*/ + +#include "LogLevel.h" +#include "RCSID.h" + +using ActiveMQ::LogLevel; + +RCSID(LogLevel, "$Id$"); + +static const char *loglevel_strings[] = { + "FATAL", + "ERROR", + "WARNING", + "INFORM", + "DEBUG"}; + +static const int loglevel_num = 5; + +LogLevel::LogLevel(int n) { + if (n >= loglevel_num) + desc_ = "INVALID"; + else + desc_ = loglevel_strings[n]; +} + +const char * +LogLevel::toString() const { + return desc_; +} + +const LogLevel LogLevel::Fatal = LogLevel(0); +const LogLevel LogLevel::Error = LogLevel(1); +const LogLevel LogLevel::Warning = LogLevel(2); +const LogLevel LogLevel::Inform = LogLevel(3); +const LogLevel LogLevel::Debug = LogLevel(4); Propchange: incubator/activemq/trunk/amazon/amq_corelib/LogLevel.cpp ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/amazon/amq_corelib/LogLevel.cpp ------------------------------------------------------------------------------ svn:keywords = Date Author Id Revision HeadURL Added: incubator/activemq/trunk/amazon/amq_corelib/LogLevel.h URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/amazon/amq_corelib/LogLevel.h?rev=427057&view=auto ============================================================================== --- incubator/activemq/trunk/amazon/amq_corelib/LogLevel.h (added) +++ incubator/activemq/trunk/amazon/amq_corelib/LogLevel.h Mon Jul 31 02:36:40 2006 @@ -0,0 +1,53 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +*/ + +#ifndef ACTIVEMQ_LOGLEVEL_H +#define ACTIVEMQ_LOGLEVEL_H + +namespace ActiveMQ { + /// Represents a (type-safe) level of logging information. + /** + @version $Id$ + */ + class LogLevel { + public: + /// FATAL log level + static const LogLevel Fatal; + + /// ERROR log level + static const LogLevel Error; + + /// WARNING log level + static const LogLevel Warning; + + /// INFORM log level + static const LogLevel Inform; + + /// DEBUG log level + static const LogLevel Debug; + + /// gets the descriptive string for a log level + const char *toString() const; + private: + LogLevel(int n); + const char *desc_; + }; +}; + +#endif // ACTIVEMQ_LOGLEVEL_H Propchange: incubator/activemq/trunk/amazon/amq_corelib/LogLevel.h ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/amazon/amq_corelib/LogLevel.h ------------------------------------------------------------------------------ svn:keywords = Date Author Id Revision HeadURL Added: incubator/activemq/trunk/amazon/amq_corelib/Logger.h URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/amazon/amq_corelib/Logger.h?rev=427057&view=auto ============================================================================== --- incubator/activemq/trunk/amazon/amq_corelib/Logger.h (added) +++ incubator/activemq/trunk/amazon/amq_corelib/Logger.h Mon Jul 31 02:36:40 2006 @@ -0,0 +1,55 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +*/ + +#ifndef ACTIVEMQ_LOGGER_H +#define ACTIVEMQ_LOGGER_H + +#include + +#include "LogLevel.h" + +namespace ActiveMQ { + /// Log event handler interface + /** + This class represents a handler for log events. It is a way + for the library to access your local logging policy. + */ + class Logger { + public: + /// Test for a log level being enabled + virtual bool isEnabled(const LogLevel& level) = 0; + + /// Log at FATAL + virtual void logFatal(const std::string& msg) = 0; + + /// Log at ERROR + virtual void logError(const std::string& msg) = 0; + + /// Log at WARNING + virtual void logWarning(const std::string& msg) = 0; + + /// Log at INFORM + virtual void logInform(const std::string& msg) = 0; + + /// Log at DEBUG + virtual void logDebug(const std::string& msg) = 0; + }; +}; + +#endif // ACTIVEMQ_LOGGER_H Propchange: incubator/activemq/trunk/amazon/amq_corelib/Logger.h ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/amazon/amq_corelib/Logger.h ------------------------------------------------------------------------------ svn:keywords = Date Author Id Revision HeadURL Added: incubator/activemq/trunk/amazon/amq_corelib/Makefile.am URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/amazon/amq_corelib/Makefile.am?rev=427057&view=auto ============================================================================== --- incubator/activemq/trunk/amazon/amq_corelib/Makefile.am (added) +++ incubator/activemq/trunk/amazon/amq_corelib/Makefile.am Mon Jul 31 02:36:40 2006 @@ -0,0 +1,6 @@ +INCLUDES = -I../ -I../marshal +lib_LTLIBRARIES = libamq_corelib.la +libamq_corelib_la_SOURCES = BlockingMessageConsumer.cpp BlockingMessageConsumerRef.cpp CoreLib.cpp CoreLibImpl.cpp ExceptionCallback.cpp Lock.cpp LogLevel.cpp MessageConsumer.cpp MessageConsumerRef.cpp NonBlockingMessageConsumer.cpp NonBlockingMessageConsumerRef.cpp StompMessage.cpp TextMessage.cpp UUIDGenerator.cpp Destination.cpp PrimitiveMap.cpp Sem.cpp +libamq_corelib_la_LIBADD = -L../marshal -lamq_marshal -L../command -lamq_command +include_HEADERS = BlockingMessageConsumer.h BlockingMessageConsumerRef.h Buffer.h BytesMessage.h CoreLib.h CoreLibImpl_.h Destination.h ExceptionCallback.h Exception.h Lock.h Logger.h LogLevel.h MessageConsumer.h MessageConsumerRef.h Message.h NonBlockingMessageConsumer.h NonBlockingMessageConsumerRef.h NullLogger.h RCSID.h StompMessage.h TextMessage.h UUIDGenerator.h PrimitiveMap.h Sem.h + Added: incubator/activemq/trunk/amazon/amq_corelib/Makefile.in URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/amazon/amq_corelib/Makefile.in?rev=427057&view=auto ============================================================================== --- incubator/activemq/trunk/amazon/amq_corelib/Makefile.in (added) +++ incubator/activemq/trunk/amazon/amq_corelib/Makefile.in Mon Jul 31 02:36:40 2006 @@ -0,0 +1,495 @@ +# Makefile.in generated by automake 1.9.6 from Makefile.am. +# @configure_input@ + +# Copyright (C) 1994, 1995, 1996, 1997, 1998, 1999, 2000, 2001, 2002, +# 2003, 2004, 2005 Free Software Foundation, Inc. +# This Makefile.in is free software; the Free Software Foundation +# gives unlimited permission to copy and/or distribute it, +# with or without modifications, as long as this notice is preserved. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY, to the extent permitted by law; without +# even the implied warranty of MERCHANTABILITY or FITNESS FOR A +# PARTICULAR PURPOSE. + +@SET_MAKE@ + + +srcdir = @srcdir@ +top_srcdir = @top_srcdir@ +VPATH = @srcdir@ +pkgdatadir = $(datadir)/@PACKAGE@ +pkglibdir = $(libdir)/@PACKAGE@ +pkgincludedir = $(includedir)/@PACKAGE@ +top_builddir = .. +am__cd = CDPATH="$${ZSH_VERSION+.}$(PATH_SEPARATOR)" && cd +INSTALL = @INSTALL@ +install_sh_DATA = $(install_sh) -c -m 644 +install_sh_PROGRAM = $(install_sh) -c +install_sh_SCRIPT = $(install_sh) -c +INSTALL_HEADER = $(INSTALL_DATA) +transform = $(program_transform_name) +NORMAL_INSTALL = : +PRE_INSTALL = : +POST_INSTALL = : +NORMAL_UNINSTALL = : +PRE_UNINSTALL = : +POST_UNINSTALL = : +build_triplet = @build@ +host_triplet = @host@ +subdir = amq_corelib +DIST_COMMON = $(include_HEADERS) $(srcdir)/Makefile.am \ + $(srcdir)/Makefile.in +ACLOCAL_M4 = $(top_srcdir)/aclocal.m4 +am__aclocal_m4_deps = $(top_srcdir)/configure.in +am__configure_deps = $(am__aclocal_m4_deps) $(CONFIGURE_DEPENDENCIES) \ + $(ACLOCAL_M4) +mkinstalldirs = $(install_sh) -d +CONFIG_CLEAN_FILES = +am__vpath_adj_setup = srcdirstrip=`echo "$(srcdir)" | sed 's|.|.|g'`; +am__vpath_adj = case $$p in \ + $(srcdir)/*) f=`echo "$$p" | sed "s|^$$srcdirstrip/||"`;; \ + *) f=$$p;; \ + esac; +am__strip_dir = `echo $$p | sed -e 's|^.*/||'`; +am__installdirs = "$(DESTDIR)$(libdir)" "$(DESTDIR)$(includedir)" +libLTLIBRARIES_INSTALL = $(INSTALL) +LTLIBRARIES = $(lib_LTLIBRARIES) +libamq_corelib_la_DEPENDENCIES = +am_libamq_corelib_la_OBJECTS = BlockingMessageConsumer.lo \ + BlockingMessageConsumerRef.lo CoreLib.lo CoreLibImpl.lo \ + ExceptionCallback.lo Lock.lo LogLevel.lo MessageConsumer.lo \ + MessageConsumerRef.lo NonBlockingMessageConsumer.lo \ + NonBlockingMessageConsumerRef.lo StompMessage.lo \ + TextMessage.lo UUIDGenerator.lo Destination.lo PrimitiveMap.lo \ + Sem.lo +libamq_corelib_la_OBJECTS = $(am_libamq_corelib_la_OBJECTS) +DEFAULT_INCLUDES = -I. -I$(srcdir) +depcomp = $(SHELL) $(top_srcdir)/depcomp +am__depfiles_maybe = depfiles +CXXCOMPILE = $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) \ + $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CXXFLAGS) $(CXXFLAGS) +LTCXXCOMPILE = $(LIBTOOL) --tag=CXX --mode=compile $(CXX) $(DEFS) \ + $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) \ + $(AM_CXXFLAGS) $(CXXFLAGS) +CXXLD = $(CXX) +CXXLINK = $(LIBTOOL) --tag=CXX --mode=link $(CXXLD) $(AM_CXXFLAGS) \ + $(CXXFLAGS) $(AM_LDFLAGS) $(LDFLAGS) -o $@ +SOURCES = $(libamq_corelib_la_SOURCES) +DIST_SOURCES = $(libamq_corelib_la_SOURCES) +includeHEADERS_INSTALL = $(INSTALL_HEADER) +HEADERS = $(include_HEADERS) +ETAGS = etags +CTAGS = ctags +DISTFILES = $(DIST_COMMON) $(DIST_SOURCES) $(TEXINFOS) $(EXTRA_DIST) +ACLOCAL = @ACLOCAL@ +AMDEP_FALSE = @AMDEP_FALSE@ +AMDEP_TRUE = @AMDEP_TRUE@ +AMTAR = @AMTAR@ +AR = @AR@ +AUTOCONF = @AUTOCONF@ +AUTOHEADER = @AUTOHEADER@ +AUTOMAKE = @AUTOMAKE@ +AWK = @AWK@ +CC = @CC@ +CCDEPMODE = @CCDEPMODE@ +CFLAGS = @CFLAGS@ +CPP = @CPP@ +CPPFLAGS = @CPPFLAGS@ +CXX = @CXX@ +CXXCPP = @CXXCPP@ +CXXDEPMODE = @CXXDEPMODE@ +CXXFLAGS = @CXXFLAGS@ +CYGPATH_W = @CYGPATH_W@ +DEFS = @DEFS@ +DEPDIR = @DEPDIR@ +ECHO = @ECHO@ +ECHO_C = @ECHO_C@ +ECHO_N = @ECHO_N@ +ECHO_T = @ECHO_T@ +EGREP = @EGREP@ +EXEEXT = @EXEEXT@ +F77 = @F77@ +FFLAGS = @FFLAGS@ +INSTALL_DATA = @INSTALL_DATA@ +INSTALL_PROGRAM = @INSTALL_PROGRAM@ +INSTALL_SCRIPT = @INSTALL_SCRIPT@ +INSTALL_STRIP_PROGRAM = @INSTALL_STRIP_PROGRAM@ +LDFLAGS = @LDFLAGS@ +LIBOBJS = @LIBOBJS@ +LIBS = @LIBS@ +LIBTOOL = @LIBTOOL@ +LN_S = @LN_S@ +LTLIBOBJS = @LTLIBOBJS@ +MAKEINFO = @MAKEINFO@ +OBJEXT = @OBJEXT@ +PACKAGE = @PACKAGE@ +PACKAGE_BUGREPORT = @PACKAGE_BUGREPORT@ +PACKAGE_NAME = @PACKAGE_NAME@ +PACKAGE_STRING = @PACKAGE_STRING@ +PACKAGE_TARNAME = @PACKAGE_TARNAME@ +PACKAGE_VERSION = @PACKAGE_VERSION@ +PATH_SEPARATOR = @PATH_SEPARATOR@ +RANLIB = @RANLIB@ +SET_MAKE = @SET_MAKE@ +SHELL = @SHELL@ +STRIP = @STRIP@ +VERSION = @VERSION@ +ac_ct_AR = @ac_ct_AR@ +ac_ct_CC = @ac_ct_CC@ +ac_ct_CXX = @ac_ct_CXX@ +ac_ct_F77 = @ac_ct_F77@ +ac_ct_RANLIB = @ac_ct_RANLIB@ +ac_ct_STRIP = @ac_ct_STRIP@ +am__fastdepCC_FALSE = @am__fastdepCC_FALSE@ +am__fastdepCC_TRUE = @am__fastdepCC_TRUE@ +am__fastdepCXX_FALSE = @am__fastdepCXX_FALSE@ +am__fastdepCXX_TRUE = @am__fastdepCXX_TRUE@ +am__include = @am__include@ +am__leading_dot = @am__leading_dot@ +am__quote = @am__quote@ +am__tar = @am__tar@ +am__untar = @am__untar@ +bindir = @bindir@ +build = @build@ +build_alias = @build_alias@ +build_cpu = @build_cpu@ +build_os = @build_os@ +build_vendor = @build_vendor@ +datadir = @datadir@ +exec_prefix = @exec_prefix@ +host = @host@ +host_alias = @host_alias@ +host_cpu = @host_cpu@ +host_os = @host_os@ +host_vendor = @host_vendor@ +includedir = @includedir@ +infodir = @infodir@ +install_sh = @install_sh@ +libdir = @libdir@ +libexecdir = @libexecdir@ +localstatedir = @localstatedir@ +mandir = @mandir@ +mkdir_p = @mkdir_p@ +oldincludedir = @oldincludedir@ +prefix = @prefix@ +program_transform_name = @program_transform_name@ +sbindir = @sbindir@ +sharedstatedir = @sharedstatedir@ +sysconfdir = @sysconfdir@ +target_alias = @target_alias@ +INCLUDES = -I../ -I../marshal +lib_LTLIBRARIES = libamq_corelib.la +libamq_corelib_la_SOURCES = BlockingMessageConsumer.cpp BlockingMessageConsumerRef.cpp CoreLib.cpp CoreLibImpl.cpp ExceptionCallback.cpp Lock.cpp LogLevel.cpp MessageConsumer.cpp MessageConsumerRef.cpp NonBlockingMessageConsumer.cpp NonBlockingMessageConsumerRef.cpp StompMessage.cpp TextMessage.cpp UUIDGenerator.cpp Destination.cpp PrimitiveMap.cpp Sem.cpp +libamq_corelib_la_LIBADD = -L../marshal -lamq_marshal -L../command -lamq_command +include_HEADERS = BlockingMessageConsumer.h BlockingMessageConsumerRef.h Buffer.h BytesMessage.h CoreLib.h CoreLibImpl_.h Destination.h ExceptionCallback.h Exception.h Lock.h Logger.h LogLevel.h MessageConsumer.h MessageConsumerRef.h Message.h NonBlockingMessageConsumer.h NonBlockingMessageConsumerRef.h NullLogger.h RCSID.h StompMessage.h TextMessage.h UUIDGenerator.h PrimitiveMap.h Sem.h +all: all-am + +.SUFFIXES: +.SUFFIXES: .cpp .lo .o .obj +$(srcdir)/Makefile.in: $(srcdir)/Makefile.am $(am__configure_deps) + @for dep in $?; do \ + case '$(am__configure_deps)' in \ + *$$dep*) \ + cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh \ + && exit 0; \ + exit 1;; \ + esac; \ + done; \ + echo ' cd $(top_srcdir) && $(AUTOMAKE) --foreign amq_corelib/Makefile'; \ + cd $(top_srcdir) && \ + $(AUTOMAKE) --foreign amq_corelib/Makefile +.PRECIOUS: Makefile +Makefile: $(srcdir)/Makefile.in $(top_builddir)/config.status + @case '$?' in \ + *config.status*) \ + cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh;; \ + *) \ + echo ' cd $(top_builddir) && $(SHELL) ./config.status $(subdir)/$@ $(am__depfiles_maybe)'; \ + cd $(top_builddir) && $(SHELL) ./config.status $(subdir)/$@ $(am__depfiles_maybe);; \ + esac; + +$(top_builddir)/config.status: $(top_srcdir)/configure $(CONFIG_STATUS_DEPENDENCIES) + cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh + +$(top_srcdir)/configure: $(am__configure_deps) + cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh +$(ACLOCAL_M4): $(am__aclocal_m4_deps) + cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh +install-libLTLIBRARIES: $(lib_LTLIBRARIES) + @$(NORMAL_INSTALL) + test -z "$(libdir)" || $(mkdir_p) "$(DESTDIR)$(libdir)" + @list='$(lib_LTLIBRARIES)'; for p in $$list; do \ + if test -f $$p; then \ + f=$(am__strip_dir) \ + echo " $(LIBTOOL) --mode=install $(libLTLIBRARIES_INSTALL) $(INSTALL_STRIP_FLAG) '$$p' '$(DESTDIR)$(libdir)/$$f'"; \ + $(LIBTOOL) --mode=install $(libLTLIBRARIES_INSTALL) $(INSTALL_STRIP_FLAG) "$$p" "$(DESTDIR)$(libdir)/$$f"; \ + else :; fi; \ + done + +uninstall-libLTLIBRARIES: + @$(NORMAL_UNINSTALL) + @set -x; list='$(lib_LTLIBRARIES)'; for p in $$list; do \ + p=$(am__strip_dir) \ + echo " $(LIBTOOL) --mode=uninstall rm -f '$(DESTDIR)$(libdir)/$$p'"; \ + $(LIBTOOL) --mode=uninstall rm -f "$(DESTDIR)$(libdir)/$$p"; \ + done + +clean-libLTLIBRARIES: + -test -z "$(lib_LTLIBRARIES)" || rm -f $(lib_LTLIBRARIES) + @list='$(lib_LTLIBRARIES)'; for p in $$list; do \ + dir="`echo $$p | sed -e 's|/[^/]*$$||'`"; \ + test "$$dir" != "$$p" || dir=.; \ + echo "rm -f \"$${dir}/so_locations\""; \ + rm -f "$${dir}/so_locations"; \ + done +libamq_corelib.la: $(libamq_corelib_la_OBJECTS) $(libamq_corelib_la_DEPENDENCIES) + $(CXXLINK) -rpath $(libdir) $(libamq_corelib_la_LDFLAGS) $(libamq_corelib_la_OBJECTS) $(libamq_corelib_la_LIBADD) $(LIBS) + +mostlyclean-compile: + -rm -f *.$(OBJEXT) + +distclean-compile: + -rm -f *.tab.c + +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/BlockingMessageConsumer.Plo@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/BlockingMessageConsumerRef.Plo@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/CoreLib.Plo@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/CoreLibImpl.Plo@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/Destination.Plo@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/ExceptionCallback.Plo@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/Lock.Plo@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/LogLevel.Plo@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/MessageConsumer.Plo@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/MessageConsumerRef.Plo@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/NonBlockingMessageConsumer.Plo@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/NonBlockingMessageConsumerRef.Plo@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/PrimitiveMap.Plo@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/Sem.Plo@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/StompMessage.Plo@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/TextMessage.Plo@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/UUIDGenerator.Plo@am__quote@ + +.cpp.o: +@am__fastdepCXX_TRUE@ if $(CXXCOMPILE) -MT $@ -MD -MP -MF "$(DEPDIR)/$*.Tpo" -c -o $@ $<; \ +@am__fastdepCXX_TRUE@ then mv -f "$(DEPDIR)/$*.Tpo" "$(DEPDIR)/$*.Po"; else rm -f "$(DEPDIR)/$*.Tpo"; exit 1; fi +@AMDEP_TRUE@@am__fastdepCXX_FALSE@ source='$<' object='$@' libtool=no @AMDEPBACKSLASH@ +@AMDEP_TRUE@@am__fastdepCXX_FALSE@ DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@ +@am__fastdepCXX_FALSE@ $(CXXCOMPILE) -c -o $@ $< + +.cpp.obj: +@am__fastdepCXX_TRUE@ if $(CXXCOMPILE) -MT $@ -MD -MP -MF "$(DEPDIR)/$*.Tpo" -c -o $@ `$(CYGPATH_W) '$<'`; \ +@am__fastdepCXX_TRUE@ then mv -f "$(DEPDIR)/$*.Tpo" "$(DEPDIR)/$*.Po"; else rm -f "$(DEPDIR)/$*.Tpo"; exit 1; fi +@AMDEP_TRUE@@am__fastdepCXX_FALSE@ source='$<' object='$@' libtool=no @AMDEPBACKSLASH@ +@AMDEP_TRUE@@am__fastdepCXX_FALSE@ DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@ +@am__fastdepCXX_FALSE@ $(CXXCOMPILE) -c -o $@ `$(CYGPATH_W) '$<'` + +.cpp.lo: +@am__fastdepCXX_TRUE@ if $(LTCXXCOMPILE) -MT $@ -MD -MP -MF "$(DEPDIR)/$*.Tpo" -c -o $@ $<; \ +@am__fastdepCXX_TRUE@ then mv -f "$(DEPDIR)/$*.Tpo" "$(DEPDIR)/$*.Plo"; else rm -f "$(DEPDIR)/$*.Tpo"; exit 1; fi +@AMDEP_TRUE@@am__fastdepCXX_FALSE@ source='$<' object='$@' libtool=yes @AMDEPBACKSLASH@ +@AMDEP_TRUE@@am__fastdepCXX_FALSE@ DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@ +@am__fastdepCXX_FALSE@ $(LTCXXCOMPILE) -c -o $@ $< + +mostlyclean-libtool: + -rm -f *.lo + +clean-libtool: + -rm -rf .libs _libs + +distclean-libtool: + -rm -f libtool +uninstall-info-am: +install-includeHEADERS: $(include_HEADERS) + @$(NORMAL_INSTALL) + test -z "$(includedir)" || $(mkdir_p) "$(DESTDIR)$(includedir)" + @list='$(include_HEADERS)'; for p in $$list; do \ + if test -f "$$p"; then d=; else d="$(srcdir)/"; fi; \ + f=$(am__strip_dir) \ + echo " $(includeHEADERS_INSTALL) '$$d$$p' '$(DESTDIR)$(includedir)/$$f'"; \ + $(includeHEADERS_INSTALL) "$$d$$p" "$(DESTDIR)$(includedir)/$$f"; \ + done + +uninstall-includeHEADERS: + @$(NORMAL_UNINSTALL) + @list='$(include_HEADERS)'; for p in $$list; do \ + f=$(am__strip_dir) \ + echo " rm -f '$(DESTDIR)$(includedir)/$$f'"; \ + rm -f "$(DESTDIR)$(includedir)/$$f"; \ + done + +ID: $(HEADERS) $(SOURCES) $(LISP) $(TAGS_FILES) + list='$(SOURCES) $(HEADERS) $(LISP) $(TAGS_FILES)'; \ + unique=`for i in $$list; do \ + if test -f "$$i"; then echo $$i; else echo $(srcdir)/$$i; fi; \ + done | \ + $(AWK) ' { files[$$0] = 1; } \ + END { for (i in files) print i; }'`; \ + mkid -fID $$unique +tags: TAGS + +TAGS: $(HEADERS) $(SOURCES) $(TAGS_DEPENDENCIES) \ + $(TAGS_FILES) $(LISP) + tags=; \ + here=`pwd`; \ + list='$(SOURCES) $(HEADERS) $(LISP) $(TAGS_FILES)'; \ + unique=`for i in $$list; do \ + if test -f "$$i"; then echo $$i; else echo $(srcdir)/$$i; fi; \ + done | \ + $(AWK) ' { files[$$0] = 1; } \ + END { for (i in files) print i; }'`; \ + if test -z "$(ETAGS_ARGS)$$tags$$unique"; then :; else \ + test -n "$$unique" || unique=$$empty_fix; \ + $(ETAGS) $(ETAGSFLAGS) $(AM_ETAGSFLAGS) $(ETAGS_ARGS) \ + $$tags $$unique; \ + fi +ctags: CTAGS +CTAGS: $(HEADERS) $(SOURCES) $(TAGS_DEPENDENCIES) \ + $(TAGS_FILES) $(LISP) + tags=; \ + here=`pwd`; \ + list='$(SOURCES) $(HEADERS) $(LISP) $(TAGS_FILES)'; \ + unique=`for i in $$list; do \ + if test -f "$$i"; then echo $$i; else echo $(srcdir)/$$i; fi; \ + done | \ + $(AWK) ' { files[$$0] = 1; } \ + END { for (i in files) print i; }'`; \ + test -z "$(CTAGS_ARGS)$$tags$$unique" \ + || $(CTAGS) $(CTAGSFLAGS) $(AM_CTAGSFLAGS) $(CTAGS_ARGS) \ + $$tags $$unique + +GTAGS: + here=`$(am__cd) $(top_builddir) && pwd` \ + && cd $(top_srcdir) \ + && gtags -i $(GTAGS_ARGS) $$here + +distclean-tags: + -rm -f TAGS ID GTAGS GRTAGS GSYMS GPATH tags + +distdir: $(DISTFILES) + @srcdirstrip=`echo "$(srcdir)" | sed 's|.|.|g'`; \ + topsrcdirstrip=`echo "$(top_srcdir)" | sed 's|.|.|g'`; \ + list='$(DISTFILES)'; for file in $$list; do \ + case $$file in \ + $(srcdir)/*) file=`echo "$$file" | sed "s|^$$srcdirstrip/||"`;; \ + $(top_srcdir)/*) file=`echo "$$file" | sed "s|^$$topsrcdirstrip/|$(top_builddir)/|"`;; \ + esac; \ + if test -f $$file || test -d $$file; then d=.; else d=$(srcdir); fi; \ + dir=`echo "$$file" | sed -e 's,/[^/]*$$,,'`; \ + if test "$$dir" != "$$file" && test "$$dir" != "."; then \ + dir="/$$dir"; \ + $(mkdir_p) "$(distdir)$$dir"; \ + else \ + dir=''; \ + fi; \ + if test -d $$d/$$file; then \ + if test -d $(srcdir)/$$file && test $$d != $(srcdir); then \ + cp -pR $(srcdir)/$$file $(distdir)$$dir || exit 1; \ + fi; \ + cp -pR $$d/$$file $(distdir)$$dir || exit 1; \ + else \ + test -f $(distdir)/$$file \ + || cp -p $$d/$$file $(distdir)/$$file \ + || exit 1; \ + fi; \ + done +check-am: all-am +check: check-am +all-am: Makefile $(LTLIBRARIES) $(HEADERS) +installdirs: + for dir in "$(DESTDIR)$(libdir)" "$(DESTDIR)$(includedir)"; do \ + test -z "$$dir" || $(mkdir_p) "$$dir"; \ + done +install: install-am +install-exec: install-exec-am +install-data: install-data-am +uninstall: uninstall-am + +install-am: all-am + @$(MAKE) $(AM_MAKEFLAGS) install-exec-am install-data-am + +installcheck: installcheck-am +install-strip: + $(MAKE) $(AM_MAKEFLAGS) INSTALL_PROGRAM="$(INSTALL_STRIP_PROGRAM)" \ + install_sh_PROGRAM="$(INSTALL_STRIP_PROGRAM)" INSTALL_STRIP_FLAG=-s \ + `test -z '$(STRIP)' || \ + echo "INSTALL_PROGRAM_ENV=STRIPPROG='$(STRIP)'"` install +mostlyclean-generic: + +clean-generic: + +distclean-generic: + -test -z "$(CONFIG_CLEAN_FILES)" || rm -f $(CONFIG_CLEAN_FILES) + +maintainer-clean-generic: + @echo "This command is intended for maintainers to use" + @echo "it deletes files that may require special tools to rebuild." +clean: clean-am + +clean-am: clean-generic clean-libLTLIBRARIES clean-libtool \ + mostlyclean-am + +distclean: distclean-am + -rm -rf ./$(DEPDIR) + -rm -f Makefile +distclean-am: clean-am distclean-compile distclean-generic \ + distclean-libtool distclean-tags + +dvi: dvi-am + +dvi-am: + +html: html-am + +info: info-am + +info-am: + +install-data-am: install-includeHEADERS + +install-exec-am: install-libLTLIBRARIES + +install-info: install-info-am + +install-man: + +installcheck-am: + +maintainer-clean: maintainer-clean-am + -rm -rf ./$(DEPDIR) + -rm -f Makefile +maintainer-clean-am: distclean-am maintainer-clean-generic + +mostlyclean: mostlyclean-am + +mostlyclean-am: mostlyclean-compile mostlyclean-generic \ + mostlyclean-libtool + +pdf: pdf-am + +pdf-am: + +ps: ps-am + +ps-am: + +uninstall-am: uninstall-includeHEADERS uninstall-info-am \ + uninstall-libLTLIBRARIES + +.PHONY: CTAGS GTAGS all all-am check check-am clean clean-generic \ + clean-libLTLIBRARIES clean-libtool ctags distclean \ + distclean-compile distclean-generic distclean-libtool \ + distclean-tags distdir dvi dvi-am html html-am info info-am \ + install install-am install-data install-data-am install-exec \ + install-exec-am install-includeHEADERS install-info \ + install-info-am install-libLTLIBRARIES install-man \ + install-strip installcheck installcheck-am installdirs \ + maintainer-clean maintainer-clean-generic mostlyclean \ + mostlyclean-compile mostlyclean-generic mostlyclean-libtool \ + pdf pdf-am ps ps-am tags uninstall uninstall-am \ + uninstall-includeHEADERS uninstall-info-am \ + uninstall-libLTLIBRARIES + +# Tell versions [3.59,3.63) of GNU make to not export all variables. +# Otherwise a system limit (for SysV at least) may be exceeded. +.NOEXPORT: Added: incubator/activemq/trunk/amazon/amq_corelib/Message.h URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/amazon/amq_corelib/Message.h?rev=427057&view=auto ============================================================================== --- incubator/activemq/trunk/amazon/amq_corelib/Message.h (added) +++ incubator/activemq/trunk/amazon/amq_corelib/Message.h Mon Jul 31 02:36:40 2006 @@ -0,0 +1,104 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +*/ + +#ifndef ACTIVEMQ_MESSAGE_H +#define ACTIVEMQ_MESSAGE_H + +#include + +#include + +#include "Buffer.h" +#include "Destination.h" + +#include "command/CommandTypes.h" + +namespace ActiveMQ { + /// Represents an ActiveMQ message + /** + Contains an ActiveMQ message. The specific types + (ObjectMessage, BytesMessage, etc) are subclasses. + + @version $Id$ + */ + class Message { + protected: + Message() {} + public: + /// gets the type + /** + Gets the integer type of this message. + + @returns the type + */ + virtual int getType() const = 0; + + /// gets the destination + /** + Gets the Destination that this message was/will be sent to. + + @returns destination of this message + */ + const Destination& getDestination() const { return destination_; } + + /// sets the destination + void setDestination(const Destination& d) { destination_ = d; } + + /// gets the reply-to destination + /** + Gets the "reply-to" Destination for this Message. + + @returns the Destination for replies to this Message. + */ + const Destination& getReplyTo() const { return replyTo_; } + + /// sets the reply-to destination + /** + Sets the "reply-to" Destination for this Message. + + @param dest the destination to ask for replies to this message on + */ + void setReplyTo(const Destination& dest) { replyTo_ = dest; } + + /// gets the time received + /** + Gets the time (in seconds since the epoch) that this + Message was received. + + @returns time message was received + */ + time_t getTimestamp() const; + + /// gets the underlying bytes + /** + Gets this Message's payload as an array of bytes. + + @returns message data + */ + virtual void marshall(Buffer& fillIn) const = 0; + + /// Destructor + virtual ~Message() {}; + private: + Destination destination_; + Destination replyTo_; + }; +}; + +#endif // ACTIVEMQ_MESSAGE_H Propchange: incubator/activemq/trunk/amazon/amq_corelib/Message.h ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/amazon/amq_corelib/Message.h ------------------------------------------------------------------------------ svn:keywords = Date Author Id Revision HeadURL Added: incubator/activemq/trunk/amazon/amq_corelib/MessageConsumer.cpp URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/amazon/amq_corelib/MessageConsumer.cpp?rev=427057&view=auto ============================================================================== --- incubator/activemq/trunk/amazon/amq_corelib/MessageConsumer.cpp (added) +++ incubator/activemq/trunk/amazon/amq_corelib/MessageConsumer.cpp Mon Jul 31 02:36:40 2006 @@ -0,0 +1,38 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +*/ + +#include + +#include "MessageConsumer.h" +#include "Message.h" +#include "RCSID.h" + +using namespace ActiveMQ; + +RCSID(MessageConsumer, "$Id$"); + +MessageConsumer::HasDest::HasDest(const Destination& d) + : dest_(d) + {} + +bool +MessageConsumer::HasDest::operator()(const Message *m) { + assert(m != NULL); + return m->getDestination() == dest_; +} Propchange: incubator/activemq/trunk/amazon/amq_corelib/MessageConsumer.cpp ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/amazon/amq_corelib/MessageConsumer.cpp ------------------------------------------------------------------------------ svn:keywords = Date Author Id Revision HeadURL Added: incubator/activemq/trunk/amazon/amq_corelib/MessageConsumer.h URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/amazon/amq_corelib/MessageConsumer.h?rev=427057&view=auto ============================================================================== --- incubator/activemq/trunk/amazon/amq_corelib/MessageConsumer.h (added) +++ incubator/activemq/trunk/amazon/amq_corelib/MessageConsumer.h Mon Jul 31 02:36:40 2006 @@ -0,0 +1,84 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +*/ + +#ifndef ACTIVEMQ_MSGCONSUMER_H +#define ACTIVEMQ_MSGCONSUMER_H + +#include + +namespace ActiveMQ { + class Message; + class CoreLib; + class Destination; + + /// Base class for MessageConsumers + /** + This class provides the common interface for MessageConsumers, + which are objects that allow the application to receive + messages. See the documentation for + ActiveMQ::NonBlockingMessageConsumer and + ActiveMQ::BlockingMessageConsumer. + + Note that the application never actually deals with one of + these directly - the application receives MessageConsumerRef + objects - weak, counted references to MessageConsumers. + + @version $Id$ + */ + class MessageConsumer { + public: + /// gets the number of messages that are ready + /** + Gets the number of waiting messages. + + @returns the number + */ + virtual unsigned int getNumReadyMessages() const = 0; + + /// receives a message + /** + Pulls a message from the internal queue. It is returned as + a std::auto_ptr to make the ownership policy clear. + + @returns the new message + */ + virtual std::auto_ptr receive() = 0; + + protected: + friend class CoreLibImpl; + MessageConsumer() {} + virtual ~MessageConsumer() {}; + virtual void enqueue(Message *msg) = 0; + virtual void removeQueued(const Destination& dest) = 0; + + class HasDest { + public: + HasDest(const Destination& d); + bool operator()(const Message *m); + private: + const Destination &dest_; + }; + + private: + MessageConsumer(const MessageConsumer& oth); + MessageConsumer &operator=(const MessageConsumer& oth); + }; +}; + +#endif // ACTIVEMQ_MSGCONSUMER_H Propchange: incubator/activemq/trunk/amazon/amq_corelib/MessageConsumer.h ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/amazon/amq_corelib/MessageConsumer.h ------------------------------------------------------------------------------ svn:keywords = Date Author Id Revision HeadURL