activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r427057 [4/22] - in /incubator/activemq/trunk/amazon: ./ amq_brokersession/ amq_corelib/ amq_examples/ amq_examples/bs_async_recv/ amq_examples/bs_send/ amq_examples/bs_sync_recv/ amq_examples/cl_send/ amq_transport/ command/ marshal/
Date Mon, 31 Jul 2006 09:36:51 GMT
Added: incubator/activemq/trunk/amazon/amq_corelib/CoreLibImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/amazon/amq_corelib/CoreLibImpl.cpp?rev=427057&view=auto
==============================================================================
--- incubator/activemq/trunk/amazon/amq_corelib/CoreLibImpl.cpp (added)
+++ incubator/activemq/trunk/amazon/amq_corelib/CoreLibImpl.cpp Mon Jul 31 02:36:40 2006
@@ -0,0 +1,780 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+*/
+
+#include <netinet/in.h>
+#include <unistd.h>
+
+#include "RCSID.h"
+#include <algorithm>
+#include <string>
+#include <sstream>
+#include <list>
+#include <utility>
+#include <iostream>
+
+#include "TextMessage.h"
+#include "BytesMessage.h"
+
+#include "PrimitiveMap.h"
+#include "MessageConsumer.h"
+#include "NonBlockingMessageConsumer.h"
+#include "BlockingMessageConsumer.h"
+#include "NonBlockingMessageConsumerRef.h"
+#include "BlockingMessageConsumerRef.h"
+#include "Exception.h"
+#include "Destination.h"
+
+#include "CoreLibImpl_.h"
+
+#include "StompMessage.h"
+#include "NullLogger.h"
+
+#include "command/CommandTypes.h"
+#include "command/ConnectionInfo.h"
+#include "command/SessionInfo.h"
+#include "command/ConsumerInfo.h"
+#include "command/WireFormatInfo.h"
+#include "command/ProducerInfo.h"
+#include "command/ShutdownInfo.h"
+#include "command/ActiveMQTopic.h"
+#include "command/ActiveMQTempTopic.h"
+#include "command/ActiveMQTempQueue.h"
+#include "command/ActiveMQQueue.h"
+#include "command/ActiveMQTextMessage.h"
+#include "command/ActiveMQBytesMessage.h"
+#include "command/ActiveMQDestination.h"
+#include "command/MessageDispatch.h"
+#include "command/Response.h"
+#include "command/BrokerInfo.h"
+#include "command/RemoveInfo.h"
+#include "command/DestinationInfo.h"
+#include "command/MessageAck.h"
+#include "marshal/BaseDataStreamMarshaller.h"
+#include "marshal/ProtocolFormat.h"
+#include "marshal/MarshallerFactory.h"
+#include "marshal/BufferWriter.h"
+#include "marshal/BufferReader.h"
+
+using namespace ActiveMQ;
+using namespace std;
+using boost::shared_ptr;
+
+RCSID(CoreLibImpl, "$Id$");
+
+#define FATAL(msg) \
+    do { if (logger_.get() && logger_->isEnabled(LogLevel::Fatal)) \
+        logger_->logFatal(msg); } while(0)
+
+#define ERROR(msg) \
+    do { if (logger_.get() && logger_->isEnabled(LogLevel::Error)) \
+        logger_->logError(msg); } while(0)
+
+#define WARNING(msg) \
+    do { if (logger_.get() && logger_->isEnabled(LogLevel::Warning)) \
+        logger_->logWarning(msg); } while(0)
+
+#define INFORM(msg) \
+    do { if (logger_.get() && logger_->isEnabled(LogLevel::Inform)) \
+        logger_->logInform(msg); } while(0)
+
+#define DEBUG(msg) \
+    do { if (logger_.get() && logger_->isEnabled(LogLevel::Debug)) \
+        logger_->logDebug(msg); } while(0)
+
+const static int OPENWIRE_VERSION = 1;
+const static int PREFETCH_SIZE    = 32766;
+const static int SESSION_ID       = 12345;
+
+const static int STANDARD_ACK     = 2;
+
+const static int ADD_DEST         = 0;
+const static int REMOVE_DEST      = 1;
+
+const static UUIDGenerator uuidgen;
+
+const static string activemq = "ActiveMQ";
+
+CoreLibImpl::CoreLibImpl(CoreLib *parent,
+                         const string& user,
+                         const string& password) :
+    parent_(parent),
+    user_(user),
+    password_(password),
+    sizeBufPos_(0),
+    inMessage_(false),
+    yetToRecv_(0),
+    sessionId_(SESSION_ID),
+    nextCommandId_(0),
+    nextProducerId_(0),
+    nextConsumerId_(0),
+    nextTempDestId_(0),
+    logger_(new NullLogger()),
+    initialized_(false) {
+
+    Marshalling::MarshallerFactory::configure(pf_);
+    clientId_ = uuidgen.getGuid();
+    connectionId_ = uuidgen.getGuid();
+}
+
+void
+CoreLibImpl::marshalPending_(Buffer& b) {
+    if (!(pendingBuffer_.empty())) {
+        b.insert(b.end(), pendingBuffer_.begin(), pendingBuffer_.end());
+        pendingBuffer_.resize(0);
+    }
+}
+
+void
+CoreLibImpl::marshalCommand_(Command::BaseCommand& o, Buffer& b) {
+    o.setCommandId(nextCommandId_++);
+    marshalCommand_(static_cast<Command::AbstractCommand&>(o), b);
+}
+
+void loosemarshalcommand(Command::WireFormatInfo& wfi, Buffer& b) {
+    size_t length = htonl(100);
+    int openwire_version = htonl(OPENWIRE_VERSION);
+    // size
+    //b.insert(b.end(), (uint8_t *)(&length), (uint8_t *)(&length) + 4);
+    // type
+    b.push_back(1);
+    // magic
+    b.push_back('A');
+    b.push_back('c');
+    b.push_back('t');
+    b.push_back('i');
+    b.push_back('v');
+    b.push_back('e');
+    b.push_back('M');
+    b.push_back('Q');
+    // openwire version
+    b.insert(b.end(), (uint8_t *)(&openwire_version), (uint8_t *)(&openwire_version) + 4);
+    // marshalled properties
+    // true it's there
+    b.push_back(1);
+    size_t mplen = htonl(wfi.getMarshalledProperties().size());
+    // length
+    b.insert(b.end(), (uint8_t *)(&mplen), (uint8_t *)(&mplen) + 4);
+    // data
+    b.insert(b.end(), wfi.getMarshalledProperties().begin(), wfi.getMarshalledProperties().end());
+
+    length = htonl(1 + 8 + 4 + 1 + 4 + ntohl(mplen));
+    b.insert(b.begin(), (uint8_t *)(&length), (uint8_t *)(&length) + 4);
+
+}
+
+void
+CoreLibImpl::marshalCommand_(Command::AbstractCommand& o, Buffer& b) {
+    IO::BooleanStream bs;
+
+    // get the marshaller
+    Marshalling::BaseDataStreamMarshaller* marshaller = pf_.getMarshaller(o.getCommandType());
+    if (marshaller == NULL) {
+        stringstream exmsg("Invalid command type (");
+        exmsg << o.getCommandType();
+        exmsg << ") passed to marshalCommand";
+        throw Exception(exmsg.str());
+    }
+    
+    // first pass gets the size and writes the flags bitset
+    uint32_t size = marshaller->marshal1(pf_, o, bs);
+    size = htonl(size + bs.getMarshalledSize());
+
+    // first 4 bytes of a message are its size
+    b.insert(b.end(), (uint8_t *)(&size), (uint8_t *)(&size) + 4);
+
+    // then one byte that is the command type
+    b.push_back(o.getCommandType());
+    
+    IO::BufferWriter bw(b);
+    // then the bitset
+    bs.marshal(bw);
+
+    // then the second marshalling pass which actually writes the data out
+    marshaller->marshal2(pf_, o, bw, bs);
+}
+
+void
+CoreLibImpl::unmarshalCommand_(Command::AbstractCommand& o, const Buffer& b) {
+    IO::BufferReader br(b);
+    IO::BooleanStream bs(br);
+
+    Marshalling::BaseDataStreamMarshaller *marshaller = pf_.getMarshaller(o.getCommandType());
+    if (marshaller == NULL) {
+        stringstream exmsg("Invalid command type (");
+        exmsg << o.getCommandType();
+        exmsg << ") received from broker";
+        throw Exception(exmsg.str());
+    }
+    marshaller->unmarshal(pf_, o, br, bs);
+}
+
+void
+CoreLibImpl::initialize(Buffer& b) {
+    // The first step is wire format negotiation - though it's not really -
+    // see https://issues.apache.org/activemq/browse/AMQ-681
+
+    // In any case, the first packet sent is a WireFormatInfo packet
+
+    Command::WireFormatInfo wfi;
+
+    // It has a magic string prefix
+    vector<uint8_t> magic(activemq.begin(), activemq.end());
+    wfi.setMagic(magic);
+
+    // OpenWire version
+    wfi.setVersion(OPENWIRE_VERSION);
+
+    // We want "tight" encoding, which is length-prefixed
+    PrimitiveMap optionMap;
+    optionMap.putBoolean("TightEncodingEnabled", true);
+
+    Buffer marshalledOptions;
+    optionMap.marshal(marshalledOptions);
+    
+    wfi.setMarshalledProperties(marshalledOptions);
+    
+//    marshalCommand_(wfi, b);
+    loosemarshalcommand(wfi, b);
+
+    // The next command sets up our JMS Connection
+
+    Command::ConnectionInfo cinfo;
+    cinfo.setUserName(user_);
+    cinfo.setPassword(password_);
+    cinfo.setClientId(clientId_);
+
+    shared_ptr<Command::ConnectionId> cid(new Command::ConnectionId());
+    cid->setValue(connectionId_);
+    cinfo.setConnectionId(cid);
+
+    marshalCommand_(cinfo, b);
+
+    // Now we set up the JMS Session
+
+    Command::SessionInfo sinfo;
+
+    shared_ptr<Command::SessionId> sid(new Command::SessionId());
+    sid->setConnectionId(connectionId_);
+    sid->setValue(sessionId_);
+
+    sinfo.setSessionId(sid);
+    
+    marshalCommand_(sinfo, b);
+
+    initialized_ = true;
+}
+
+void
+CoreLibImpl::publish(const Message& m, Buffer& b) {
+    publish(m.getDestination(), m, b);
+}
+
+void
+CoreLibImpl::publish(const Destination& d,
+                     const Message& m,
+                     Buffer &b) {
+
+    // Offload any pending data now that we have the opportunity
+    marshalPending_(b);
+
+    shared_ptr<Command::ActiveMQDestination> dest(d.createCommandInstance().release());
+
+    // Create our producer if we haven't already
+    if (producerId_.get() == NULL) {
+        Command::ProducerInfo pi;
+
+        shared_ptr<Command::ProducerId> pid(new Command::ProducerId());
+        pid->setConnectionId(connectionId_);
+        pid->setSessionId(sessionId_);
+        pid->setValue(nextProducerId_++);
+        producerId_ = pid;
+
+        pi.setProducerId(producerId_);
+        marshalCommand_(pi, b);
+    }
+
+    auto_ptr<Command::Message> message;
+
+    if (m.getType() == Command::Types::ACTIVEMQ_TEXT_MESSAGE)
+        message.reset(new Command::ActiveMQTextMessage());
+    else if (m.getType() == Command::Types::ACTIVEMQ_BYTES_MESSAGE)
+        message.reset(new Command::ActiveMQBytesMessage());
+
+    message->setProducerId(producerId_);
+    message->setDestination(dest);
+    message->setPersistent(false);
+
+    if (!(m.getReplyTo().getName().empty())) {
+        shared_ptr<Command::ActiveMQDestination> replyTo(m.getReplyTo().createCommandInstance().release());
+        message->setReplyTo(replyTo);
+    }
+
+    shared_ptr<Command::MessageId> mid(new Command::MessageId());
+
+    mid->setProducerId(producerId_);
+    message->setMessageId(mid);
+
+    Buffer content;
+    // serialize the message into the buffer
+    m.marshall(content);
+
+    message->setContent(content);
+
+    marshalCommand_(*message, b);
+}
+
+void
+CoreLibImpl::disconnect(Buffer& b) {
+    marshalPending_(b);
+
+    // remove the sessionid
+
+    Command::RemoveInfo r;
+    shared_ptr<Command::SessionId> sidptr(new Command::SessionId());
+    sidptr->setValue(sessionId_);
+    sidptr->setConnectionId(connectionId_);
+    r.setObjectId(sidptr);
+    marshalCommand_(r, b);
+
+    // remove the connectionid
+
+    Command::RemoveInfo r2;
+    shared_ptr<Command::ConnectionId> cidptr(new Command::ConnectionId());
+    cidptr->setValue(connectionId_);
+    r2.setObjectId(cidptr);
+    marshalCommand_(r2, b);
+    
+    // send shutdowninfo
+
+    Command::ShutdownInfo si;
+    marshalCommand_(si, b);
+    initialized_ = false;
+}
+
+CoreLibImpl::~CoreLibImpl() {
+    for (list<MessageConsumerRef *>::iterator i = consumerRefs_.begin();
+         i != consumerRefs_.end(); ++i) {
+        (*i)->invalidate();
+        deregisterRef(*i);
+    }
+    for (list<const Destination *>::iterator i = allRegisteredDestinations_.begin();
+         i != allRegisteredDestinations_.end(); ++i) {
+        const_cast<Destination*>(*i)->invalidate();
+    }
+}
+
+void
+CoreLibImpl::subscribe(const Destination& d, MessageConsumerRef& q, Buffer& b) {
+    marshalPending_(b);
+
+    MessageConsumer *mcptr = q.getConsumer();
+
+    destinationMaps_.insert(pair<Destination, MessageConsumer *>(d, mcptr));
+
+    if (consumerIds_.count(d) == 0) {
+        // make a new consumer id if there isn't one for this destination
+        shared_ptr<Command::ConsumerId> cid(new Command::ConsumerId());
+        cid->setConnectionId(connectionId_);
+        cid->setSessionId(sessionId_);
+        cid->setValue(nextConsumerId_++);
+        consumerIds_[d] = cid;
+    }
+
+    Command::ConsumerInfo si;
+    shared_ptr<Command::ActiveMQDestination> dest(d.createCommandInstance().release());
+
+    si.setConsumerId(consumerIds_[d]);
+    si.setDestination(dest);
+    si.setPrefetchSize(PREFETCH_SIZE);
+    si.setDispatchAsync(true);
+
+    marshalCommand_(si, b);
+}
+
+void
+CoreLibImpl::unsubscribe(const Destination& d, Buffer& b) {
+    marshalPending_(b);
+
+    map<Destination, MessageConsumer*>::iterator i = destinationMaps_.find(d);
+    if (i == destinationMaps_.end())
+        throw Exception("Not subscribed to destination " + d.getName());
+    i->second->removeQueued(d);
+
+    if (consumerIds_.count(d) != 0) {
+
+        // Remove the consumer for this destination
+        
+        map<Destination, shared_ptr<Command::ConsumerId> >::iterator di = consumerIds_.find(d);
+        shared_ptr<Command::IDataStructure> ourcid(di->second);
+        Command::RemoveInfo ri;
+        ri.setObjectId(ourcid);
+        
+        marshalCommand_(ri, b);
+        
+        consumerIds_.erase(di);
+    }
+}
+
+void
+CoreLibImpl::handleData(const Buffer& incoming, Buffer& b) {
+    handleData(&(incoming.operator[](0)), incoming.size(), b);
+}
+
+void
+CoreLibImpl::unmarshalBuffer(vector<uint8_t>& buf, Buffer& b) {
+    int type = buf[0];
+
+    buf.erase(buf.begin());
+
+    switch (type) {
+    case Command::Types::BROKER_INFO: {
+        // BrokerInfo
+        // Ignore it, nothing to really do with it
+    }
+        break;
+    case Command::Types::SHUTDOWN_INFO: {
+        // ShutdownInfo, broker is exiting
+        Command::ShutdownInfo s;
+
+        unmarshalCommand_(s, buf);
+
+        INFORM("Got ShutdownInfo, shutting down");
+        disconnect(b);
+    }
+        break;
+    case Command::Types::MESSAGE_DISPATCH: {
+        // MessageDispatch, new message arriving
+        Command::MessageDispatch md;
+        
+        unmarshalCommand_(md, buf);
+
+        shared_ptr<const Command::Message> m = md.getMessage();
+        if (m.get() != NULL) {
+            shared_ptr<const Command::ActiveMQDestination> dest = m->getDestination();
+            if (dest.get() != NULL) {
+
+                // Ack the message
+
+                Command::MessageAck ack;
+
+                auto_ptr<Command::ActiveMQDestination> destToAck;
+                Destination myd;
+
+                switch(dest->getCommandType()) {
+                case Command::Types::ACTIVEMQ_TOPIC:
+                    destToAck.reset(new Command::ActiveMQTopic());
+                    myd = Destination(parent_, dest->getPhysicalName(), false /* isTemp */, true /* isTopic */);
+                    break;
+                case Command::Types::ACTIVEMQ_QUEUE:
+                    destToAck.reset(new Command::ActiveMQQueue());
+                    myd = Destination(parent_, dest->getPhysicalName(), false /* isTemp */, false /* isTopic */);
+                    break;
+                case Command::Types::ACTIVEMQ_TEMP_TOPIC:
+                    destToAck.reset(new Command::ActiveMQTempTopic());
+                    myd = Destination(parent_, dest->getPhysicalName(), true /* isTemp */, true /* isTopic */);
+                    break;
+                case Command::Types::ACTIVEMQ_TEMP_QUEUE:
+                    destToAck.reset(new Command::ActiveMQTempQueue());
+                    myd = Destination(parent_, dest->getPhysicalName(), true /* isTemp */, false /* isTopic */);
+                    break;
+                };
+
+                destToAck->setPhysicalName(dest->getPhysicalName());
+
+                shared_ptr<Command::ConsumerId> cidToAck(consumerIds_[myd]);
+
+                ack.setConsumerId(cidToAck);
+                ack.setDestination(shared_ptr<Command::ActiveMQDestination>(destToAck));
+                ack.setAckType(STANDARD_ACK); // tells broker to discard the message
+                ack.setMessageCount(1);
+                marshalCommand_(ack, b);
+
+                const string& ourdestname(dest->getPhysicalName());
+
+                map<Destination, MessageConsumer *>::iterator i = destinationMaps_.find(myd);
+                if (i == destinationMaps_.end())
+                    WARNING("No MessageConsumer registered for received message on destination " + ourdestname);
+                else {
+                    int mtype = m->getCommandType();
+                    auto_ptr<Message> toenqueue;
+                    switch (mtype) {
+                    case Command::Types::ACTIVEMQ_TEXT_MESSAGE:
+                        toenqueue.reset(new TextMessage(m->getContent()));
+                        break;
+                    case Command::Types::ACTIVEMQ_BYTES_MESSAGE:
+                        toenqueue.reset(new BytesMessage(m->getContent()));
+                        break;
+                    default:
+                        WARNING("Unknown/unimplemented message command type");
+                        return;
+                    };
+
+                    toenqueue->setDestination(myd);
+
+                    if (m->getReplyTo() != NULL) {
+                        const bool rtoIsTemp =  m->getReplyTo()->getCommandType() == Command::Types::ACTIVEMQ_TEMP_QUEUE ||
+                                             m->getReplyTo()->getCommandType() == Command::Types::ACTIVEMQ_TEMP_TOPIC;
+
+                        const bool rtoIsTopic = m->getReplyTo()->getCommandType() == Command::Types::ACTIVEMQ_TEMP_TOPIC ||
+                                             m->getReplyTo()->getCommandType() == Command::Types::ACTIVEMQ_TOPIC;
+
+                        Destination rto(parent_, m->getReplyTo()->getPhysicalName(), rtoIsTemp, rtoIsTopic);
+
+                        toenqueue->setReplyTo(rto);
+                    }
+
+                    i->second->enqueue(toenqueue.release());
+                }
+            }
+        }
+    }
+        break;
+    }
+}
+
+void
+CoreLibImpl::handleData(const uint8_t *buf, size_t len, Buffer& b) {
+    if (!inMessage_) {
+        uint32_t msgsize = 0;
+        if (sizeBufPos_ == 0) {
+            if (len >= 4) {
+                msgsize = ntohl(*(uint32_t*)(buf));
+                buf += 4;
+                len -= 4;
+            }
+            else {
+                memcpy(sizeBuf_, buf, len);
+                sizeBufPos_ += len;
+                return;
+            }
+        }
+        else {
+            if (len + sizeBufPos_ >= 4) {
+                memcpy(sizeBuf_ + sizeBufPos_, buf, 4 - sizeBufPos_);
+                msgsize = ntohl(*(uint32_t*)(sizeBuf_));
+                buf += (4 - sizeBufPos_);
+                len -= (4 - sizeBufPos_);
+            }
+            else {
+                memcpy(sizeBuf_ + sizeBufPos_, buf, len);
+                sizeBufPos_ += len;
+                return;
+            }
+        }
+
+        if (msgsize == 0)
+            throw Exception("Zero-length message - corrupt data stream");
+
+        unmarshalBuffer_.resize(0);
+        unmarshalBuffer_.reserve(msgsize);
+        yetToRecv_ = msgsize;
+        inMessage_ = true;
+        sizeBufPos_ = 0;
+    }
+
+    // does this incoming message fill or exceed the buffer?
+
+    if (len == yetToRecv_) { // exact match
+        inMessage_ = false;
+        unmarshalBuffer_.insert(unmarshalBuffer_.end(), buf, buf + len);
+        unmarshalBuffer(unmarshalBuffer_, b);
+    }
+    else if (len < yetToRecv_) { // not yet
+        unmarshalBuffer_.insert(unmarshalBuffer_.end(), buf, buf + len);
+        yetToRecv_ -= len;
+    }
+    else if (len > yetToRecv_) { // too much
+        inMessage_ = false;
+        size_t excess = len - yetToRecv_; // number of excess bytes past the current message
+        size_t thismsg = len - excess; // number of bytes of this buffer that are the current message
+        unmarshalBuffer_.insert(unmarshalBuffer_.end(), buf, buf + thismsg);
+        unmarshalBuffer(unmarshalBuffer_, b);
+        handleData(buf + thismsg, excess, b);
+    }
+}
+
+BlockingMessageConsumerRef
+CoreLibImpl::newBlockingMessageConsumer(void) {
+    BlockingMessageConsumer *q = new BlockingMessageConsumer();
+    consumers_.push_back(q);
+    refCounts_[q] = 0;
+    return BlockingMessageConsumerRef(parent_, q);
+}
+
+NonBlockingMessageConsumerRef
+CoreLibImpl::newNonBlockingMessageConsumer(void) {
+    NonBlockingMessageConsumer *q = new NonBlockingMessageConsumer();
+    consumers_.push_back(q);
+    refCounts_[q] = 0;
+    return NonBlockingMessageConsumerRef(parent_, q);
+}
+
+void
+CoreLibImpl::registerRef(MessageConsumerRef *q) {
+    if (NULL == q)
+        return;
+
+    MessageConsumer *mq = q->getConsumer();
+    consumerRefs_.push_back(q);
+    ++refCounts_[mq];
+}
+
+void
+CoreLibImpl::deregisterRef(MessageConsumerRef *q) {
+
+    if (NULL == q)
+        return;
+    MessageConsumer *mq = q->getConsumer();
+    int count = --refCounts_[mq];
+    if (count == 0) {
+        refCounts_.erase(mq);
+        // delete all references to the consumer
+        for(map<Destination, MessageConsumer *>::iterator i = destinationMaps_.begin();
+            i != destinationMaps_.end();) {
+            if (i->second == mq)
+                destinationMaps_.erase(i++);
+            else
+                ++i;
+        }
+        consumers_.remove(mq);
+        // delete the consumer
+        delete mq;
+    }
+    consumerRefs_.remove(q);
+}
+
+void
+CoreLibImpl::setLogger(auto_ptr<Logger> lgr) {
+    logger_ = lgr;
+}
+
+Logger&
+CoreLibImpl::getLogger() {
+    return *logger_;
+}
+
+void
+CoreLibImpl::registerDest(const Destination& d) {
+    map<string, int>::iterator i = destRefCounts_.find(d.toString());
+    if (i == destRefCounts_.end())
+        destRefCounts_.insert(pair<string,int>(d.toString(), 1));
+    else
+        i->second++;
+
+    allRegisteredDestinations_.push_back(&d);
+}
+
+void
+CoreLibImpl::unregisterDest(const Destination& d) {
+    map<std::string, int>::iterator i = destRefCounts_.find(d.toString());
+    if (i == destRefCounts_.end())
+        return;
+
+    allRegisteredDestinations_.remove(&d);
+
+    if (i->second == 1) {
+        // This is the last reference to this destination
+
+        map<Destination, shared_ptr<Command::ConsumerId> >::iterator cii = consumerIds_.find(d);
+        // delete the relevant consumer, if there is one
+        if (cii != consumerIds_.end()) {
+            shared_ptr<Command::IDataStructure> ci(cii->second);
+            Command::RemoveInfo r;
+            r.setObjectId(ci);
+            marshalCommand_(r, pendingBuffer_);
+
+            consumerIds_.erase(cii);
+        }
+
+        // delete the destination, if it's our responsibility
+
+        // we know we created a temporary destination if our connection ID is in it
+        if (d.isTemporary() && d.getName().find(connectionId_) != string::npos) {
+            Command::DestinationInfo di;
+
+            shared_ptr<Command::ConnectionId> cid(new Command::ConnectionId());
+            cid->setValue(connectionId_);
+            shared_ptr<Command::ActiveMQDestination> dest(d.createCommandInstance().release());
+            di.setDestination(dest);
+            di.setOperationType(REMOVE_DEST);
+            di.setConnectionId(cid);
+
+            marshalCommand_(di, pendingBuffer_);
+        }
+        destRefCounts_.erase(i);
+    }
+    else
+        i->second--;
+}
+
+Destination
+CoreLibImpl::createTemporaryTopic() {
+    stringstream str;
+    str << connectionId_ << ":" << nextTempDestId_++;
+    Destination ret(parent_, str.str(), true /* isTemp */, true /* isTopic */);
+
+    Command::DestinationInfo di;
+
+    shared_ptr<Command::ConnectionId> cid(new Command::ConnectionId());
+    cid->setValue(connectionId_);
+
+    shared_ptr<Command::ActiveMQDestination> dest(new Command::ActiveMQTempTopic());
+    dest->setPhysicalName(ret.getName());
+
+    di.setDestination(dest);
+    di.setOperationType(ADD_DEST);
+    di.setConnectionId(cid);
+
+    marshalCommand_(di, pendingBuffer_);
+
+    return ret;
+}
+
+Destination
+CoreLibImpl::createTemporaryQueue() {
+    stringstream str;
+    str << connectionId_ << ":" << nextTempDestId_++;
+    Destination ret(parent_, str.str(), true /* isTemp */, false /* isTopic */);
+
+    Command::DestinationInfo di;
+
+    shared_ptr<Command::ConnectionId> cid(new Command::ConnectionId());
+    cid->setValue(connectionId_);
+
+    shared_ptr<Command::ActiveMQDestination> dest(new Command::ActiveMQTempQueue());
+    dest->setPhysicalName(ret.getName());
+
+    di.setDestination(dest);
+    di.setOperationType(ADD_DEST);
+    di.setConnectionId(cid);
+
+    marshalCommand_(di, pendingBuffer_);
+
+    return ret;
+}
+
+Destination
+CoreLibImpl::createTopic(const std::string& name) {
+    return Destination(parent_, name, false /* isTemp */, true /* isTopic */);
+}
+
+Destination
+CoreLibImpl::createQueue(const std::string& name) {
+    return Destination(parent_, name, false /* isTemp */, false /* isTopic */);
+}

Propchange: incubator/activemq/trunk/amazon/amq_corelib/CoreLibImpl.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/amazon/amq_corelib/CoreLibImpl.cpp
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/amazon/amq_corelib/CoreLibImpl_.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/amazon/amq_corelib/CoreLibImpl_.h?rev=427057&view=auto
==============================================================================
--- incubator/activemq/trunk/amazon/amq_corelib/CoreLibImpl_.h (added)
+++ incubator/activemq/trunk/amazon/amq_corelib/CoreLibImpl_.h Mon Jul 31 02:36:40 2006
@@ -0,0 +1,150 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+*/
+
+#ifndef ACTIVEMQ_CORELIB_IMPL_H
+#define ACTIVEMQ_CORELIB_IMPL_H
+
+#include <string>
+#include <map>
+#include <list>
+#include <memory>
+
+#include <boost/shared_ptr.hpp>
+
+#include "Destination.h"
+
+#include "Message.h"
+
+#include "Buffer.h"
+#include "BlockingMessageConsumerRef.h"
+#include "NonBlockingMessageConsumerRef.h"
+#include "Logger.h"
+#include "UUIDGenerator.h"
+
+#include "marshal/ProtocolFormat.h"
+
+namespace ActiveMQ {
+    namespace Command {
+        class BaseCommand;
+        class AbstractCommand;
+        class ProducerId;
+        class ConsumerId;
+    };
+
+    class CoreLib;
+    /// Private implementation class for CoreLib
+    class CoreLibImpl {
+    public:
+        virtual ~CoreLibImpl();
+    private:
+        friend class CoreLib;
+        CoreLibImpl(CoreLib *parent,
+                    const std::string& user,
+                    const std::string& password);
+        void initialize(Buffer& b);
+        void disconnect(Buffer& b);
+        void publish(const Message& msg, Buffer& b);
+        void publish(const Destination& dest,
+                     const Message& msg,
+                     Buffer& b);
+        void subscribe(const Destination& dest,
+                       MessageConsumerRef& q,
+                       Buffer& b);
+        void unsubscribe(const Destination& dest,
+                         Buffer& b);
+        void handleData(const Buffer& incoming, Buffer& b);
+        void handleData(const uint8_t *buf, size_t len, Buffer& b);
+        NonBlockingMessageConsumerRef newNonBlockingMessageConsumer();
+        BlockingMessageConsumerRef newBlockingMessageConsumer();
+        void setLogger(std::auto_ptr<Logger> lgr);
+        Logger& getLogger();
+        Destination createTemporaryTopic();
+        Destination createTemporaryQueue();
+        Destination createTopic(const std::string& name);
+        Destination createQueue(const std::string& name);
+        void registerDest(const Destination& d);
+        void unregisterDest(const Destination& d);
+
+        CoreLibImpl(const CoreLibImpl &);
+        CoreLibImpl& operator=(const CoreLibImpl &);
+
+        CoreLib *parent_; // for passing to new MessageConsumerRef objects and destinations
+
+        const std::string user_;
+        const std::string password_;
+        std::map<Destination, MessageConsumer *> destinationMaps_;
+        std::list<MessageConsumer *> consumers_;
+        std::list<MessageConsumerRef *> consumerRefs_;
+        std::map<MessageConsumer *, int> refCounts_;
+
+        // Destinations are reference-counted
+        std::map<std::string, int> destRefCounts_;
+        std::list<const Destination*> allRegisteredDestinations_;
+
+        // State of asynchronous message unmarshalling
+        std::vector<uint8_t> unmarshalBuffer_;
+        uint8_t sizeBuf_[4];
+        uint8_t sizeBufPos_;
+        
+        // Operations that don't have a way to pass up outgoing data
+        // can use the pending buffer.
+        std::vector<uint8_t> pendingBuffer_;
+        void marshalPending_(Buffer& b);
+        
+        // Unmarshals a full buffer containing one message
+        void unmarshalBuffer(std::vector<uint8_t>& buf, Buffer& b);
+        // internal data used to track message unmarshalling state
+        bool inMessage_;
+        size_t yetToRecv_;
+
+        Marshalling::ProtocolFormat pf_;
+
+        // We only keep one connection / session open for the entire time
+        int64_t sessionId_;
+        std::string clientId_;
+        std::string connectionId_;
+        // Only one JMS producer is used for all outgoing messages
+        boost::shared_ptr<Command::ProducerId> producerId_;
+
+        // subject -> consumer id mapping
+        std::map<Destination, boost::shared_ptr<Command::ConsumerId> > consumerIds_;
+
+        // keep track of ids to use in allocation of new objects
+        int nextCommandId_;
+        int nextProducerId_;
+        int nextConsumerId_;
+        int nextTempDestId_;
+
+        // Marshal a command into a buffer
+        void marshalCommand_(Command::AbstractCommand& o, Buffer& b);
+        void marshalCommand_(Command::BaseCommand& o, Buffer& b);
+
+        // Unmarshal a command from a buffer
+        void unmarshalCommand_(Command::AbstractCommand& o, const Buffer& b);
+
+        std::auto_ptr<Logger> logger_;
+
+        void registerRef(MessageConsumerRef *mc);
+        void deregisterRef(MessageConsumerRef *mc);
+
+        bool initialized_;
+    };
+};
+
+#endif // ACTIVEMQ_CORELIB_IMPL_H

Propchange: incubator/activemq/trunk/amazon/amq_corelib/CoreLibImpl_.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/amazon/amq_corelib/CoreLibImpl_.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/amazon/amq_corelib/Destination.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/amazon/amq_corelib/Destination.cpp?rev=427057&view=auto
==============================================================================
--- incubator/activemq/trunk/amazon/amq_corelib/Destination.cpp (added)
+++ incubator/activemq/trunk/amazon/amq_corelib/Destination.cpp Mon Jul 31 02:36:40 2006
@@ -0,0 +1,120 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+*/
+
+#include <assert.h>
+
+#include "Destination.h"
+#include "CoreLib.h"
+
+#include "command/ActiveMQTempTopic.h"
+#include "command/ActiveMQTempQueue.h"
+#include "command/ActiveMQQueue.h"
+#include "command/ActiveMQTopic.h"
+
+#include <iostream>
+#include <sstream>
+
+using namespace ActiveMQ;
+using namespace std;
+
+Destination::Destination() :
+    name_(""),
+    cl_(NULL),
+    temporary_(false),
+    isTopic_(false)
+  {}
+
+Destination::Destination(CoreLib *corelib, const std::string& name, bool isTemp, bool isTopic) :
+    name_(name),
+    cl_(corelib),
+    temporary_(isTemp),
+    isTopic_(isTopic) {
+    
+    stringstream str;
+    str << name_;
+    str << ":isTemporary=" << temporary_;
+    str << ":isTopic=" << isTopic_;
+    string_.assign(str.str());
+
+    assert(cl_ != NULL);
+    cl_->registerDest(*this);
+}
+
+Destination::Destination(const Destination& oth) :
+    name_(oth.name_),
+    cl_(oth.cl_),
+    temporary_(oth.temporary_),
+    isTopic_(oth.isTopic_),
+    string_(oth.string_)
+{
+    if (cl_ != NULL) {
+        cl_->registerDest(*this);
+    }
+}
+
+Destination&
+Destination::operator=(const Destination& oth) {
+    if (&oth != this) {
+        if (cl_ != NULL) {
+            cl_->unregisterDest(*this);
+        }
+        cl_ = oth.cl_;
+        name_ = oth.name_;
+        temporary_ = oth.temporary_;
+        isTopic_ = oth.isTopic_;
+        string_.assign(oth.string_);
+        if (cl_ != NULL) {
+            cl_->registerDest(*this);
+        }
+    }
+
+    return *this;
+}
+
+void
+Destination::invalidate() {
+    cl_ = NULL;
+}
+
+Destination::~Destination() {
+    if (cl_ != NULL) {
+        cl_->unregisterDest(*this);
+    }
+}
+
+auto_ptr<Command::ActiveMQDestination>
+Destination::createCommandInstance() const {
+    auto_ptr<Command::ActiveMQDestination> ret;
+
+    if (isTemporary()) {
+        if (isTopic())
+            ret.reset(new Command::ActiveMQTempTopic());
+        else
+            ret.reset(new Command::ActiveMQTempQueue());
+    }
+    else {
+        if (isTopic())
+            ret.reset(new Command::ActiveMQTopic());
+        else
+            ret.reset(new Command::ActiveMQQueue());
+    }
+
+    ret->setPhysicalName(name_);
+    return ret;
+}

Propchange: incubator/activemq/trunk/amazon/amq_corelib/Destination.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/amazon/amq_corelib/Destination.cpp
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/amazon/amq_corelib/Destination.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/amazon/amq_corelib/Destination.h?rev=427057&view=auto
==============================================================================
--- incubator/activemq/trunk/amazon/amq_corelib/Destination.h (added)
+++ incubator/activemq/trunk/amazon/amq_corelib/Destination.h Mon Jul 31 02:36:40 2006
@@ -0,0 +1,81 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+*/
+
+#ifndef ACTIVEMQ_DESTINATION_H
+#define ACTIVEMQ_DESTINATION_H
+
+#include <string>
+#include <memory>
+
+#include "command/ActiveMQDestination.h"
+
+namespace ActiveMQ {
+    class CoreLib;
+    class CoreLibImpl;
+
+    /// Message destination
+    /**
+       This class holds an ActiveMQ "destination."  It can be either a
+       JMS Topic or Queue, or temporary versions of either of those.
+
+       Destinations are always constructed from the BrokerSession or
+       CoreLib objects, as there is broker communication involved in
+       setting up and tearing down destinations.
+
+       @version $Id$
+     */
+    class Destination {
+    public:
+        /// Gets the name of the destination
+        const std::string& getName() const { return name_; }
+
+        /// Gets a normalized descriptive string of the destination
+        std::string toString() const { return string_; }
+
+        /// indicates whether or not the destination is temporary
+        bool isTemporary() const { return temporary_; }
+
+        /// indicates whether or not the destination is a topic
+        bool isTopic() const { return isTopic_; }
+
+        Destination();
+        virtual ~Destination();
+        Destination(const Destination& oth);
+        Destination& operator=(const Destination& oth);
+
+	bool operator<(const Destination& oth) const { return name_ < oth.name_; }
+
+        bool operator==(const Destination& oth) const { return name_ == oth.name_ && temporary_ == oth.temporary_ && isTopic_ == oth.isTopic_; }
+
+        bool operator!=(const Destination& oth) const { return !operator==(oth); }
+    private:
+        std::string name_;
+        CoreLib *cl_;
+        bool temporary_;
+        bool isTopic_;
+        std::string string_;
+
+        friend class CoreLibImpl;
+        Destination(CoreLib *corelib, const std::string& name, bool isTemp, bool isTopic);
+        std::auto_ptr<Command::ActiveMQDestination> createCommandInstance() const;
+        void invalidate();
+    };
+};
+
+#endif // ACTIVEMQ_DESTINATION_H

Propchange: incubator/activemq/trunk/amazon/amq_corelib/Destination.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/amazon/amq_corelib/Destination.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/amazon/amq_corelib/Exception.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/amazon/amq_corelib/Exception.h?rev=427057&view=auto
==============================================================================
--- incubator/activemq/trunk/amazon/amq_corelib/Exception.h (added)
+++ incubator/activemq/trunk/amazon/amq_corelib/Exception.h Mon Jul 31 02:36:40 2006
@@ -0,0 +1,50 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+*/
+
+#ifndef ACTIVEMQ_EXCEPTION_H
+#define ACTIVEMQ_EXCEPTION_H
+
+#include <exception>
+#include <string>
+
+namespace ActiveMQ {
+    /// Messaging library exception
+    /**
+       This class represents an error inside the messaging
+       library.  This could be the result of bad function call
+       arguments, or a malformed/error message received back from
+       the broker.
+
+       @version $Id$
+    */
+    class Exception : public std::exception {
+    public:
+        Exception(const std::string& desc) : desc_(desc) {}
+        Exception(const char *desc) throw() : desc_(desc) {}
+        Exception(const Exception& oth) : desc_(oth.desc_) {}
+        Exception& operator=(const Exception& oth) { desc_ = oth.desc_;
+                                                     return *this; }
+        const char *what() const throw() {return desc_.c_str();}
+        virtual ~Exception() throw() {}
+    private:
+        std::string desc_;
+    };
+};
+
+#endif // ACTIVEMQ_EXCEPTION_H

Propchange: incubator/activemq/trunk/amazon/amq_corelib/Exception.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/amazon/amq_corelib/Exception.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/amazon/amq_corelib/ExceptionCallback.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/amazon/amq_corelib/ExceptionCallback.cpp?rev=427057&view=auto
==============================================================================
--- incubator/activemq/trunk/amazon/amq_corelib/ExceptionCallback.cpp (added)
+++ incubator/activemq/trunk/amazon/amq_corelib/ExceptionCallback.cpp Mon Jul 31 02:36:40 2006
@@ -0,0 +1,41 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+*/
+
+#include <iostream>
+
+#include "RCSID.h"
+#include "ExceptionCallback.h"
+
+using namespace ActiveMQ;
+using namespace std;
+
+RCSID(ExceptionCallback, "$Id$");
+
+void
+ExceptionCallback::operator()(const Exception& e) const {
+    if (pimpl.get())
+        (*pimpl)(e);
+}
+
+ExceptionCallback::ExceptionCallback(const ExceptionCallback& oth) {
+    if (oth.pimpl.get())
+        pimpl.reset(oth.pimpl->clone());
+    else
+        pimpl.reset();
+}

Propchange: incubator/activemq/trunk/amazon/amq_corelib/ExceptionCallback.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/amazon/amq_corelib/ExceptionCallback.cpp
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/amazon/amq_corelib/ExceptionCallback.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/amazon/amq_corelib/ExceptionCallback.h?rev=427057&view=auto
==============================================================================
--- incubator/activemq/trunk/amazon/amq_corelib/ExceptionCallback.h (added)
+++ incubator/activemq/trunk/amazon/amq_corelib/ExceptionCallback.h Mon Jul 31 02:36:40 2006
@@ -0,0 +1,70 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+*/
+
+#ifndef ACTIVEMQ_EXCEPTIONCALLBACK_H
+#define ACTIVEMQ_EXCEPTIONCALLBACK_H
+
+#include <memory>
+
+#include "Exception.h"
+
+namespace ActiveMQ {
+    /// Callback for BrokerSession exceptions.
+    /**
+       This class represents a function or function object that will
+       be called for exceptions raised in the BrokerSessions's
+       internal event thread.  You shouldn't need to construct it
+       yourself - it will be implicitly constructed from any function
+       or function object that matches the void (*)(const Exception &)
+       signature.
+
+       @version $Id$
+    */       
+    class ExceptionCallback {
+    private:
+        struct Impl {
+            virtual void operator()(const Exception& msg) const = 0;
+            virtual Impl *clone() const = 0;
+            virtual ~Impl() {}
+        };
+
+        template<class T>
+        struct Wrapper : public Impl {
+            Wrapper(const T& callfunc) : f_(callfunc) {}
+            void operator()(const Exception& msg) const { f_(msg); }
+            Impl *clone() const { return new Wrapper<T>(f_); }
+            virtual ~Wrapper() {}
+        private:
+            const T f_;
+        };
+
+        std::auto_ptr<Impl> pimpl;
+
+    public:
+        ExceptionCallback() : pimpl(NULL) {}
+        ExceptionCallback(const ExceptionCallback& oth);
+
+        template<class T>
+        ExceptionCallback(T callfunc) : pimpl(new Wrapper<T>(callfunc)) {}
+
+        void operator()(const Exception& msg) const;
+    };
+};
+
+#endif // ACTIVEMQ_EXCEPTIONCALLBACK_H

Propchange: incubator/activemq/trunk/amazon/amq_corelib/ExceptionCallback.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/amazon/amq_corelib/ExceptionCallback.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/amazon/amq_corelib/Lock.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/amazon/amq_corelib/Lock.cpp?rev=427057&view=auto
==============================================================================
--- incubator/activemq/trunk/amazon/amq_corelib/Lock.cpp (added)
+++ incubator/activemq/trunk/amazon/amq_corelib/Lock.cpp Mon Jul 31 02:36:40 2006
@@ -0,0 +1,55 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+*/
+
+#include <errno.h>
+#include <stdio.h>
+
+#include "Lock.h"
+#include "Exception.h"
+#include "RCSID.h"
+
+using ActiveMQ::Lock;
+using ActiveMQ::Exception;
+
+RCSID(Lock, "$Id$");
+
+Lock::Lock(pthread_mutex_t *m)
+  : mutex_(m)
+{
+    if (NULL == m)
+        throw Exception("NULL mutex passed to Lock");
+    int rc = pthread_mutex_lock(m);
+    if (rc != 0)
+        throw Exception(errno >= sys_nerr
+                        ? "Unknown error"
+                        : sys_errlist[errno]);
+    locked_ = true;
+}
+
+void
+Lock::unlock() {
+    if (locked_) {
+        pthread_mutex_unlock(mutex_);
+        locked_ = false;
+    }
+}
+
+Lock::~Lock() {
+    unlock();
+}

Propchange: incubator/activemq/trunk/amazon/amq_corelib/Lock.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/amazon/amq_corelib/Lock.cpp
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/amazon/amq_corelib/Lock.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/amazon/amq_corelib/Lock.h?rev=427057&view=auto
==============================================================================
--- incubator/activemq/trunk/amazon/amq_corelib/Lock.h (added)
+++ incubator/activemq/trunk/amazon/amq_corelib/Lock.h Mon Jul 31 02:36:40 2006
@@ -0,0 +1,47 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+*/
+
+#ifndef ACTIVEMQ_LOCK_H
+#define ACTIVEMQ_LOCK_H
+
+#include <pthread.h>
+
+namespace ActiveMQ {
+    /// RAII lock class, used internally
+    /**
+       This class holds a lock for its lifetime.  The constructor
+       acquires the lock, and the destructor releases it.  This is to
+       allow exception-safe locking (the lock will be released during
+       stack unwind).
+
+       @version $Id$
+    */
+    class Lock {
+    public:
+        Lock(pthread_mutex_t *m);
+        /// Explicit unlock for where implicit unlocking is inconvenient.
+        void unlock();
+        ~Lock();
+    private:
+        pthread_mutex_t *mutex_;
+        bool locked_;
+    };
+};
+
+#endif // ACTIVEMQ_LOCK_H

Propchange: incubator/activemq/trunk/amazon/amq_corelib/Lock.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/amazon/amq_corelib/Lock.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/amazon/amq_corelib/LogLevel.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/amazon/amq_corelib/LogLevel.cpp?rev=427057&view=auto
==============================================================================
--- incubator/activemq/trunk/amazon/amq_corelib/LogLevel.cpp (added)
+++ incubator/activemq/trunk/amazon/amq_corelib/LogLevel.cpp Mon Jul 31 02:36:40 2006
@@ -0,0 +1,52 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+*/
+
+#include "LogLevel.h"
+#include "RCSID.h"
+
+using ActiveMQ::LogLevel;
+
+RCSID(LogLevel, "$Id$");
+
+static const char *loglevel_strings[] = {
+    "FATAL",
+    "ERROR",
+    "WARNING",
+    "INFORM",
+    "DEBUG"};
+
+static const int loglevel_num = 5;
+
+LogLevel::LogLevel(int n) {
+    if (n >= loglevel_num)
+        desc_ = "INVALID";
+    else
+        desc_ = loglevel_strings[n];
+}
+
+const char *
+LogLevel::toString() const {
+    return desc_;
+}
+
+const LogLevel LogLevel::Fatal = LogLevel(0);
+const LogLevel LogLevel::Error = LogLevel(1);
+const LogLevel LogLevel::Warning = LogLevel(2);
+const LogLevel LogLevel::Inform = LogLevel(3);
+const LogLevel LogLevel::Debug = LogLevel(4);

Propchange: incubator/activemq/trunk/amazon/amq_corelib/LogLevel.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/amazon/amq_corelib/LogLevel.cpp
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/amazon/amq_corelib/LogLevel.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/amazon/amq_corelib/LogLevel.h?rev=427057&view=auto
==============================================================================
--- incubator/activemq/trunk/amazon/amq_corelib/LogLevel.h (added)
+++ incubator/activemq/trunk/amazon/amq_corelib/LogLevel.h Mon Jul 31 02:36:40 2006
@@ -0,0 +1,53 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+*/
+
+#ifndef ACTIVEMQ_LOGLEVEL_H
+#define ACTIVEMQ_LOGLEVEL_H
+
+namespace ActiveMQ {
+    /// Represents a (type-safe) level of logging information.
+    /**
+       @version $Id$
+    */      
+    class LogLevel {
+    public:
+        /// FATAL log level
+        static const LogLevel Fatal;
+
+        /// ERROR log level
+        static const LogLevel Error;
+
+        /// WARNING log level
+        static const LogLevel Warning;
+
+        /// INFORM log level
+        static const LogLevel Inform;
+
+        /// DEBUG log level
+        static const LogLevel Debug;
+
+        /// gets the descriptive string for a log level
+        const char *toString() const;
+    private:
+        LogLevel(int n);
+        const char *desc_;
+    };
+};
+
+#endif // ACTIVEMQ_LOGLEVEL_H

Propchange: incubator/activemq/trunk/amazon/amq_corelib/LogLevel.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/amazon/amq_corelib/LogLevel.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/amazon/amq_corelib/Logger.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/amazon/amq_corelib/Logger.h?rev=427057&view=auto
==============================================================================
--- incubator/activemq/trunk/amazon/amq_corelib/Logger.h (added)
+++ incubator/activemq/trunk/amazon/amq_corelib/Logger.h Mon Jul 31 02:36:40 2006
@@ -0,0 +1,55 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+*/
+
+#ifndef ACTIVEMQ_LOGGER_H
+#define ACTIVEMQ_LOGGER_H
+
+#include <string>
+
+#include "LogLevel.h"
+
+namespace ActiveMQ {
+    /// Log event handler interface
+    /**
+       This class represents a handler for log events.  It is a way
+       for the library to access your local logging policy.
+    */
+    class Logger {
+    public:
+        /// Test for a log level being enabled
+        virtual bool isEnabled(const LogLevel& level) = 0;
+        
+        /// Log at FATAL
+        virtual void logFatal(const std::string& msg) = 0;
+
+        /// Log at ERROR
+        virtual void logError(const std::string& msg) = 0;
+
+        /// Log at WARNING
+        virtual void logWarning(const std::string& msg) = 0;
+
+        /// Log at INFORM
+        virtual void logInform(const std::string& msg) = 0;
+
+        /// Log at DEBUG
+        virtual void logDebug(const std::string& msg) = 0;
+    };
+};
+
+#endif // ACTIVEMQ_LOGGER_H

Propchange: incubator/activemq/trunk/amazon/amq_corelib/Logger.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/amazon/amq_corelib/Logger.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/amazon/amq_corelib/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/amazon/amq_corelib/Makefile.am?rev=427057&view=auto
==============================================================================
--- incubator/activemq/trunk/amazon/amq_corelib/Makefile.am (added)
+++ incubator/activemq/trunk/amazon/amq_corelib/Makefile.am Mon Jul 31 02:36:40 2006
@@ -0,0 +1,6 @@
+INCLUDES = -I../ -I../marshal
+lib_LTLIBRARIES = libamq_corelib.la
+libamq_corelib_la_SOURCES = BlockingMessageConsumer.cpp BlockingMessageConsumerRef.cpp CoreLib.cpp CoreLibImpl.cpp ExceptionCallback.cpp Lock.cpp LogLevel.cpp MessageConsumer.cpp MessageConsumerRef.cpp NonBlockingMessageConsumer.cpp NonBlockingMessageConsumerRef.cpp StompMessage.cpp TextMessage.cpp UUIDGenerator.cpp Destination.cpp PrimitiveMap.cpp Sem.cpp
+libamq_corelib_la_LIBADD = -L../marshal -lamq_marshal -L../command -lamq_command
+include_HEADERS = BlockingMessageConsumer.h BlockingMessageConsumerRef.h Buffer.h BytesMessage.h CoreLib.h CoreLibImpl_.h Destination.h ExceptionCallback.h Exception.h Lock.h Logger.h LogLevel.h MessageConsumer.h MessageConsumerRef.h Message.h NonBlockingMessageConsumer.h NonBlockingMessageConsumerRef.h NullLogger.h RCSID.h StompMessage.h TextMessage.h UUIDGenerator.h PrimitiveMap.h Sem.h
+

Added: incubator/activemq/trunk/amazon/amq_corelib/Makefile.in
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/amazon/amq_corelib/Makefile.in?rev=427057&view=auto
==============================================================================
--- incubator/activemq/trunk/amazon/amq_corelib/Makefile.in (added)
+++ incubator/activemq/trunk/amazon/amq_corelib/Makefile.in Mon Jul 31 02:36:40 2006
@@ -0,0 +1,495 @@
+# Makefile.in generated by automake 1.9.6 from Makefile.am.
+# @configure_input@
+
+# Copyright (C) 1994, 1995, 1996, 1997, 1998, 1999, 2000, 2001, 2002,
+# 2003, 2004, 2005  Free Software Foundation, Inc.
+# This Makefile.in is free software; the Free Software Foundation
+# gives unlimited permission to copy and/or distribute it,
+# with or without modifications, as long as this notice is preserved.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY, to the extent permitted by law; without
+# even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+# PARTICULAR PURPOSE.
+
+@SET_MAKE@
+
+
+srcdir = @srcdir@
+top_srcdir = @top_srcdir@
+VPATH = @srcdir@
+pkgdatadir = $(datadir)/@PACKAGE@
+pkglibdir = $(libdir)/@PACKAGE@
+pkgincludedir = $(includedir)/@PACKAGE@
+top_builddir = ..
+am__cd = CDPATH="$${ZSH_VERSION+.}$(PATH_SEPARATOR)" && cd
+INSTALL = @INSTALL@
+install_sh_DATA = $(install_sh) -c -m 644
+install_sh_PROGRAM = $(install_sh) -c
+install_sh_SCRIPT = $(install_sh) -c
+INSTALL_HEADER = $(INSTALL_DATA)
+transform = $(program_transform_name)
+NORMAL_INSTALL = :
+PRE_INSTALL = :
+POST_INSTALL = :
+NORMAL_UNINSTALL = :
+PRE_UNINSTALL = :
+POST_UNINSTALL = :
+build_triplet = @build@
+host_triplet = @host@
+subdir = amq_corelib
+DIST_COMMON = $(include_HEADERS) $(srcdir)/Makefile.am \
+	$(srcdir)/Makefile.in
+ACLOCAL_M4 = $(top_srcdir)/aclocal.m4
+am__aclocal_m4_deps = $(top_srcdir)/configure.in
+am__configure_deps = $(am__aclocal_m4_deps) $(CONFIGURE_DEPENDENCIES) \
+	$(ACLOCAL_M4)
+mkinstalldirs = $(install_sh) -d
+CONFIG_CLEAN_FILES =
+am__vpath_adj_setup = srcdirstrip=`echo "$(srcdir)" | sed 's|.|.|g'`;
+am__vpath_adj = case $$p in \
+    $(srcdir)/*) f=`echo "$$p" | sed "s|^$$srcdirstrip/||"`;; \
+    *) f=$$p;; \
+  esac;
+am__strip_dir = `echo $$p | sed -e 's|^.*/||'`;
+am__installdirs = "$(DESTDIR)$(libdir)" "$(DESTDIR)$(includedir)"
+libLTLIBRARIES_INSTALL = $(INSTALL)
+LTLIBRARIES = $(lib_LTLIBRARIES)
+libamq_corelib_la_DEPENDENCIES =
+am_libamq_corelib_la_OBJECTS = BlockingMessageConsumer.lo \
+	BlockingMessageConsumerRef.lo CoreLib.lo CoreLibImpl.lo \
+	ExceptionCallback.lo Lock.lo LogLevel.lo MessageConsumer.lo \
+	MessageConsumerRef.lo NonBlockingMessageConsumer.lo \
+	NonBlockingMessageConsumerRef.lo StompMessage.lo \
+	TextMessage.lo UUIDGenerator.lo Destination.lo PrimitiveMap.lo \
+	Sem.lo
+libamq_corelib_la_OBJECTS = $(am_libamq_corelib_la_OBJECTS)
+DEFAULT_INCLUDES = -I. -I$(srcdir)
+depcomp = $(SHELL) $(top_srcdir)/depcomp
+am__depfiles_maybe = depfiles
+CXXCOMPILE = $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) \
+	$(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CXXFLAGS) $(CXXFLAGS)
+LTCXXCOMPILE = $(LIBTOOL) --tag=CXX --mode=compile $(CXX) $(DEFS) \
+	$(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) \
+	$(AM_CXXFLAGS) $(CXXFLAGS)
+CXXLD = $(CXX)
+CXXLINK = $(LIBTOOL) --tag=CXX --mode=link $(CXXLD) $(AM_CXXFLAGS) \
+	$(CXXFLAGS) $(AM_LDFLAGS) $(LDFLAGS) -o $@
+SOURCES = $(libamq_corelib_la_SOURCES)
+DIST_SOURCES = $(libamq_corelib_la_SOURCES)
+includeHEADERS_INSTALL = $(INSTALL_HEADER)
+HEADERS = $(include_HEADERS)
+ETAGS = etags
+CTAGS = ctags
+DISTFILES = $(DIST_COMMON) $(DIST_SOURCES) $(TEXINFOS) $(EXTRA_DIST)
+ACLOCAL = @ACLOCAL@
+AMDEP_FALSE = @AMDEP_FALSE@
+AMDEP_TRUE = @AMDEP_TRUE@
+AMTAR = @AMTAR@
+AR = @AR@
+AUTOCONF = @AUTOCONF@
+AUTOHEADER = @AUTOHEADER@
+AUTOMAKE = @AUTOMAKE@
+AWK = @AWK@
+CC = @CC@
+CCDEPMODE = @CCDEPMODE@
+CFLAGS = @CFLAGS@
+CPP = @CPP@
+CPPFLAGS = @CPPFLAGS@
+CXX = @CXX@
+CXXCPP = @CXXCPP@
+CXXDEPMODE = @CXXDEPMODE@
+CXXFLAGS = @CXXFLAGS@
+CYGPATH_W = @CYGPATH_W@
+DEFS = @DEFS@
+DEPDIR = @DEPDIR@
+ECHO = @ECHO@
+ECHO_C = @ECHO_C@
+ECHO_N = @ECHO_N@
+ECHO_T = @ECHO_T@
+EGREP = @EGREP@
+EXEEXT = @EXEEXT@
+F77 = @F77@
+FFLAGS = @FFLAGS@
+INSTALL_DATA = @INSTALL_DATA@
+INSTALL_PROGRAM = @INSTALL_PROGRAM@
+INSTALL_SCRIPT = @INSTALL_SCRIPT@
+INSTALL_STRIP_PROGRAM = @INSTALL_STRIP_PROGRAM@
+LDFLAGS = @LDFLAGS@
+LIBOBJS = @LIBOBJS@
+LIBS = @LIBS@
+LIBTOOL = @LIBTOOL@
+LN_S = @LN_S@
+LTLIBOBJS = @LTLIBOBJS@
+MAKEINFO = @MAKEINFO@
+OBJEXT = @OBJEXT@
+PACKAGE = @PACKAGE@
+PACKAGE_BUGREPORT = @PACKAGE_BUGREPORT@
+PACKAGE_NAME = @PACKAGE_NAME@
+PACKAGE_STRING = @PACKAGE_STRING@
+PACKAGE_TARNAME = @PACKAGE_TARNAME@
+PACKAGE_VERSION = @PACKAGE_VERSION@
+PATH_SEPARATOR = @PATH_SEPARATOR@
+RANLIB = @RANLIB@
+SET_MAKE = @SET_MAKE@
+SHELL = @SHELL@
+STRIP = @STRIP@
+VERSION = @VERSION@
+ac_ct_AR = @ac_ct_AR@
+ac_ct_CC = @ac_ct_CC@
+ac_ct_CXX = @ac_ct_CXX@
+ac_ct_F77 = @ac_ct_F77@
+ac_ct_RANLIB = @ac_ct_RANLIB@
+ac_ct_STRIP = @ac_ct_STRIP@
+am__fastdepCC_FALSE = @am__fastdepCC_FALSE@
+am__fastdepCC_TRUE = @am__fastdepCC_TRUE@
+am__fastdepCXX_FALSE = @am__fastdepCXX_FALSE@
+am__fastdepCXX_TRUE = @am__fastdepCXX_TRUE@
+am__include = @am__include@
+am__leading_dot = @am__leading_dot@
+am__quote = @am__quote@
+am__tar = @am__tar@
+am__untar = @am__untar@
+bindir = @bindir@
+build = @build@
+build_alias = @build_alias@
+build_cpu = @build_cpu@
+build_os = @build_os@
+build_vendor = @build_vendor@
+datadir = @datadir@
+exec_prefix = @exec_prefix@
+host = @host@
+host_alias = @host_alias@
+host_cpu = @host_cpu@
+host_os = @host_os@
+host_vendor = @host_vendor@
+includedir = @includedir@
+infodir = @infodir@
+install_sh = @install_sh@
+libdir = @libdir@
+libexecdir = @libexecdir@
+localstatedir = @localstatedir@
+mandir = @mandir@
+mkdir_p = @mkdir_p@
+oldincludedir = @oldincludedir@
+prefix = @prefix@
+program_transform_name = @program_transform_name@
+sbindir = @sbindir@
+sharedstatedir = @sharedstatedir@
+sysconfdir = @sysconfdir@
+target_alias = @target_alias@
+INCLUDES = -I../ -I../marshal
+lib_LTLIBRARIES = libamq_corelib.la
+libamq_corelib_la_SOURCES = BlockingMessageConsumer.cpp BlockingMessageConsumerRef.cpp CoreLib.cpp CoreLibImpl.cpp ExceptionCallback.cpp Lock.cpp LogLevel.cpp MessageConsumer.cpp MessageConsumerRef.cpp NonBlockingMessageConsumer.cpp NonBlockingMessageConsumerRef.cpp StompMessage.cpp TextMessage.cpp UUIDGenerator.cpp Destination.cpp PrimitiveMap.cpp Sem.cpp
+libamq_corelib_la_LIBADD = -L../marshal -lamq_marshal -L../command -lamq_command
+include_HEADERS = BlockingMessageConsumer.h BlockingMessageConsumerRef.h Buffer.h BytesMessage.h CoreLib.h CoreLibImpl_.h Destination.h ExceptionCallback.h Exception.h Lock.h Logger.h LogLevel.h MessageConsumer.h MessageConsumerRef.h Message.h NonBlockingMessageConsumer.h NonBlockingMessageConsumerRef.h NullLogger.h RCSID.h StompMessage.h TextMessage.h UUIDGenerator.h PrimitiveMap.h Sem.h
+all: all-am
+
+.SUFFIXES:
+.SUFFIXES: .cpp .lo .o .obj
+$(srcdir)/Makefile.in:  $(srcdir)/Makefile.am  $(am__configure_deps)
+	@for dep in $?; do \
+	  case '$(am__configure_deps)' in \
+	    *$$dep*) \
+	      cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh \
+		&& exit 0; \
+	      exit 1;; \
+	  esac; \
+	done; \
+	echo ' cd $(top_srcdir) && $(AUTOMAKE) --foreign  amq_corelib/Makefile'; \
+	cd $(top_srcdir) && \
+	  $(AUTOMAKE) --foreign  amq_corelib/Makefile
+.PRECIOUS: Makefile
+Makefile: $(srcdir)/Makefile.in $(top_builddir)/config.status
+	@case '$?' in \
+	  *config.status*) \
+	    cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh;; \
+	  *) \
+	    echo ' cd $(top_builddir) && $(SHELL) ./config.status $(subdir)/$@ $(am__depfiles_maybe)'; \
+	    cd $(top_builddir) && $(SHELL) ./config.status $(subdir)/$@ $(am__depfiles_maybe);; \
+	esac;
+
+$(top_builddir)/config.status: $(top_srcdir)/configure $(CONFIG_STATUS_DEPENDENCIES)
+	cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh
+
+$(top_srcdir)/configure:  $(am__configure_deps)
+	cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh
+$(ACLOCAL_M4):  $(am__aclocal_m4_deps)
+	cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh
+install-libLTLIBRARIES: $(lib_LTLIBRARIES)
+	@$(NORMAL_INSTALL)
+	test -z "$(libdir)" || $(mkdir_p) "$(DESTDIR)$(libdir)"
+	@list='$(lib_LTLIBRARIES)'; for p in $$list; do \
+	  if test -f $$p; then \
+	    f=$(am__strip_dir) \
+	    echo " $(LIBTOOL) --mode=install $(libLTLIBRARIES_INSTALL) $(INSTALL_STRIP_FLAG) '$$p' '$(DESTDIR)$(libdir)/$$f'"; \
+	    $(LIBTOOL) --mode=install $(libLTLIBRARIES_INSTALL) $(INSTALL_STRIP_FLAG) "$$p" "$(DESTDIR)$(libdir)/$$f"; \
+	  else :; fi; \
+	done
+
+uninstall-libLTLIBRARIES:
+	@$(NORMAL_UNINSTALL)
+	@set -x; list='$(lib_LTLIBRARIES)'; for p in $$list; do \
+	  p=$(am__strip_dir) \
+	  echo " $(LIBTOOL) --mode=uninstall rm -f '$(DESTDIR)$(libdir)/$$p'"; \
+	  $(LIBTOOL) --mode=uninstall rm -f "$(DESTDIR)$(libdir)/$$p"; \
+	done
+
+clean-libLTLIBRARIES:
+	-test -z "$(lib_LTLIBRARIES)" || rm -f $(lib_LTLIBRARIES)
+	@list='$(lib_LTLIBRARIES)'; for p in $$list; do \
+	  dir="`echo $$p | sed -e 's|/[^/]*$$||'`"; \
+	  test "$$dir" != "$$p" || dir=.; \
+	  echo "rm -f \"$${dir}/so_locations\""; \
+	  rm -f "$${dir}/so_locations"; \
+	done
+libamq_corelib.la: $(libamq_corelib_la_OBJECTS) $(libamq_corelib_la_DEPENDENCIES) 
+	$(CXXLINK) -rpath $(libdir) $(libamq_corelib_la_LDFLAGS) $(libamq_corelib_la_OBJECTS) $(libamq_corelib_la_LIBADD) $(LIBS)
+
+mostlyclean-compile:
+	-rm -f *.$(OBJEXT)
+
+distclean-compile:
+	-rm -f *.tab.c
+
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/BlockingMessageConsumer.Plo@am__quote@
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/BlockingMessageConsumerRef.Plo@am__quote@
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/CoreLib.Plo@am__quote@
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/CoreLibImpl.Plo@am__quote@
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/Destination.Plo@am__quote@
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/ExceptionCallback.Plo@am__quote@
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/Lock.Plo@am__quote@
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/LogLevel.Plo@am__quote@
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/MessageConsumer.Plo@am__quote@
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/MessageConsumerRef.Plo@am__quote@
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/NonBlockingMessageConsumer.Plo@am__quote@
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/NonBlockingMessageConsumerRef.Plo@am__quote@
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/PrimitiveMap.Plo@am__quote@
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/Sem.Plo@am__quote@
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/StompMessage.Plo@am__quote@
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/TextMessage.Plo@am__quote@
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/UUIDGenerator.Plo@am__quote@
+
+.cpp.o:
+@am__fastdepCXX_TRUE@	if $(CXXCOMPILE) -MT $@ -MD -MP -MF "$(DEPDIR)/$*.Tpo" -c -o $@ $<; \
+@am__fastdepCXX_TRUE@	then mv -f "$(DEPDIR)/$*.Tpo" "$(DEPDIR)/$*.Po"; else rm -f "$(DEPDIR)/$*.Tpo"; exit 1; fi
+@AMDEP_TRUE@@am__fastdepCXX_FALSE@	source='$<' object='$@' libtool=no @AMDEPBACKSLASH@
+@AMDEP_TRUE@@am__fastdepCXX_FALSE@	DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@
+@am__fastdepCXX_FALSE@	$(CXXCOMPILE) -c -o $@ $<
+
+.cpp.obj:
+@am__fastdepCXX_TRUE@	if $(CXXCOMPILE) -MT $@ -MD -MP -MF "$(DEPDIR)/$*.Tpo" -c -o $@ `$(CYGPATH_W) '$<'`; \
+@am__fastdepCXX_TRUE@	then mv -f "$(DEPDIR)/$*.Tpo" "$(DEPDIR)/$*.Po"; else rm -f "$(DEPDIR)/$*.Tpo"; exit 1; fi
+@AMDEP_TRUE@@am__fastdepCXX_FALSE@	source='$<' object='$@' libtool=no @AMDEPBACKSLASH@
+@AMDEP_TRUE@@am__fastdepCXX_FALSE@	DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@
+@am__fastdepCXX_FALSE@	$(CXXCOMPILE) -c -o $@ `$(CYGPATH_W) '$<'`
+
+.cpp.lo:
+@am__fastdepCXX_TRUE@	if $(LTCXXCOMPILE) -MT $@ -MD -MP -MF "$(DEPDIR)/$*.Tpo" -c -o $@ $<; \
+@am__fastdepCXX_TRUE@	then mv -f "$(DEPDIR)/$*.Tpo" "$(DEPDIR)/$*.Plo"; else rm -f "$(DEPDIR)/$*.Tpo"; exit 1; fi
+@AMDEP_TRUE@@am__fastdepCXX_FALSE@	source='$<' object='$@' libtool=yes @AMDEPBACKSLASH@
+@AMDEP_TRUE@@am__fastdepCXX_FALSE@	DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@
+@am__fastdepCXX_FALSE@	$(LTCXXCOMPILE) -c -o $@ $<
+
+mostlyclean-libtool:
+	-rm -f *.lo
+
+clean-libtool:
+	-rm -rf .libs _libs
+
+distclean-libtool:
+	-rm -f libtool
+uninstall-info-am:
+install-includeHEADERS: $(include_HEADERS)
+	@$(NORMAL_INSTALL)
+	test -z "$(includedir)" || $(mkdir_p) "$(DESTDIR)$(includedir)"
+	@list='$(include_HEADERS)'; for p in $$list; do \
+	  if test -f "$$p"; then d=; else d="$(srcdir)/"; fi; \
+	  f=$(am__strip_dir) \
+	  echo " $(includeHEADERS_INSTALL) '$$d$$p' '$(DESTDIR)$(includedir)/$$f'"; \
+	  $(includeHEADERS_INSTALL) "$$d$$p" "$(DESTDIR)$(includedir)/$$f"; \
+	done
+
+uninstall-includeHEADERS:
+	@$(NORMAL_UNINSTALL)
+	@list='$(include_HEADERS)'; for p in $$list; do \
+	  f=$(am__strip_dir) \
+	  echo " rm -f '$(DESTDIR)$(includedir)/$$f'"; \
+	  rm -f "$(DESTDIR)$(includedir)/$$f"; \
+	done
+
+ID: $(HEADERS) $(SOURCES) $(LISP) $(TAGS_FILES)
+	list='$(SOURCES) $(HEADERS) $(LISP) $(TAGS_FILES)'; \
+	unique=`for i in $$list; do \
+	    if test -f "$$i"; then echo $$i; else echo $(srcdir)/$$i; fi; \
+	  done | \
+	  $(AWK) '    { files[$$0] = 1; } \
+	       END { for (i in files) print i; }'`; \
+	mkid -fID $$unique
+tags: TAGS
+
+TAGS:  $(HEADERS) $(SOURCES)  $(TAGS_DEPENDENCIES) \
+		$(TAGS_FILES) $(LISP)
+	tags=; \
+	here=`pwd`; \
+	list='$(SOURCES) $(HEADERS)  $(LISP) $(TAGS_FILES)'; \
+	unique=`for i in $$list; do \
+	    if test -f "$$i"; then echo $$i; else echo $(srcdir)/$$i; fi; \
+	  done | \
+	  $(AWK) '    { files[$$0] = 1; } \
+	       END { for (i in files) print i; }'`; \
+	if test -z "$(ETAGS_ARGS)$$tags$$unique"; then :; else \
+	  test -n "$$unique" || unique=$$empty_fix; \
+	  $(ETAGS) $(ETAGSFLAGS) $(AM_ETAGSFLAGS) $(ETAGS_ARGS) \
+	    $$tags $$unique; \
+	fi
+ctags: CTAGS
+CTAGS:  $(HEADERS) $(SOURCES)  $(TAGS_DEPENDENCIES) \
+		$(TAGS_FILES) $(LISP)
+	tags=; \
+	here=`pwd`; \
+	list='$(SOURCES) $(HEADERS)  $(LISP) $(TAGS_FILES)'; \
+	unique=`for i in $$list; do \
+	    if test -f "$$i"; then echo $$i; else echo $(srcdir)/$$i; fi; \
+	  done | \
+	  $(AWK) '    { files[$$0] = 1; } \
+	       END { for (i in files) print i; }'`; \
+	test -z "$(CTAGS_ARGS)$$tags$$unique" \
+	  || $(CTAGS) $(CTAGSFLAGS) $(AM_CTAGSFLAGS) $(CTAGS_ARGS) \
+	     $$tags $$unique
+
+GTAGS:
+	here=`$(am__cd) $(top_builddir) && pwd` \
+	  && cd $(top_srcdir) \
+	  && gtags -i $(GTAGS_ARGS) $$here
+
+distclean-tags:
+	-rm -f TAGS ID GTAGS GRTAGS GSYMS GPATH tags
+
+distdir: $(DISTFILES)
+	@srcdirstrip=`echo "$(srcdir)" | sed 's|.|.|g'`; \
+	topsrcdirstrip=`echo "$(top_srcdir)" | sed 's|.|.|g'`; \
+	list='$(DISTFILES)'; for file in $$list; do \
+	  case $$file in \
+	    $(srcdir)/*) file=`echo "$$file" | sed "s|^$$srcdirstrip/||"`;; \
+	    $(top_srcdir)/*) file=`echo "$$file" | sed "s|^$$topsrcdirstrip/|$(top_builddir)/|"`;; \
+	  esac; \
+	  if test -f $$file || test -d $$file; then d=.; else d=$(srcdir); fi; \
+	  dir=`echo "$$file" | sed -e 's,/[^/]*$$,,'`; \
+	  if test "$$dir" != "$$file" && test "$$dir" != "."; then \
+	    dir="/$$dir"; \
+	    $(mkdir_p) "$(distdir)$$dir"; \
+	  else \
+	    dir=''; \
+	  fi; \
+	  if test -d $$d/$$file; then \
+	    if test -d $(srcdir)/$$file && test $$d != $(srcdir); then \
+	      cp -pR $(srcdir)/$$file $(distdir)$$dir || exit 1; \
+	    fi; \
+	    cp -pR $$d/$$file $(distdir)$$dir || exit 1; \
+	  else \
+	    test -f $(distdir)/$$file \
+	    || cp -p $$d/$$file $(distdir)/$$file \
+	    || exit 1; \
+	  fi; \
+	done
+check-am: all-am
+check: check-am
+all-am: Makefile $(LTLIBRARIES) $(HEADERS)
+installdirs:
+	for dir in "$(DESTDIR)$(libdir)" "$(DESTDIR)$(includedir)"; do \
+	  test -z "$$dir" || $(mkdir_p) "$$dir"; \
+	done
+install: install-am
+install-exec: install-exec-am
+install-data: install-data-am
+uninstall: uninstall-am
+
+install-am: all-am
+	@$(MAKE) $(AM_MAKEFLAGS) install-exec-am install-data-am
+
+installcheck: installcheck-am
+install-strip:
+	$(MAKE) $(AM_MAKEFLAGS) INSTALL_PROGRAM="$(INSTALL_STRIP_PROGRAM)" \
+	  install_sh_PROGRAM="$(INSTALL_STRIP_PROGRAM)" INSTALL_STRIP_FLAG=-s \
+	  `test -z '$(STRIP)' || \
+	    echo "INSTALL_PROGRAM_ENV=STRIPPROG='$(STRIP)'"` install
+mostlyclean-generic:
+
+clean-generic:
+
+distclean-generic:
+	-test -z "$(CONFIG_CLEAN_FILES)" || rm -f $(CONFIG_CLEAN_FILES)
+
+maintainer-clean-generic:
+	@echo "This command is intended for maintainers to use"
+	@echo "it deletes files that may require special tools to rebuild."
+clean: clean-am
+
+clean-am: clean-generic clean-libLTLIBRARIES clean-libtool \
+	mostlyclean-am
+
+distclean: distclean-am
+	-rm -rf ./$(DEPDIR)
+	-rm -f Makefile
+distclean-am: clean-am distclean-compile distclean-generic \
+	distclean-libtool distclean-tags
+
+dvi: dvi-am
+
+dvi-am:
+
+html: html-am
+
+info: info-am
+
+info-am:
+
+install-data-am: install-includeHEADERS
+
+install-exec-am: install-libLTLIBRARIES
+
+install-info: install-info-am
+
+install-man:
+
+installcheck-am:
+
+maintainer-clean: maintainer-clean-am
+	-rm -rf ./$(DEPDIR)
+	-rm -f Makefile
+maintainer-clean-am: distclean-am maintainer-clean-generic
+
+mostlyclean: mostlyclean-am
+
+mostlyclean-am: mostlyclean-compile mostlyclean-generic \
+	mostlyclean-libtool
+
+pdf: pdf-am
+
+pdf-am:
+
+ps: ps-am
+
+ps-am:
+
+uninstall-am: uninstall-includeHEADERS uninstall-info-am \
+	uninstall-libLTLIBRARIES
+
+.PHONY: CTAGS GTAGS all all-am check check-am clean clean-generic \
+	clean-libLTLIBRARIES clean-libtool ctags distclean \
+	distclean-compile distclean-generic distclean-libtool \
+	distclean-tags distdir dvi dvi-am html html-am info info-am \
+	install install-am install-data install-data-am install-exec \
+	install-exec-am install-includeHEADERS install-info \
+	install-info-am install-libLTLIBRARIES install-man \
+	install-strip installcheck installcheck-am installdirs \
+	maintainer-clean maintainer-clean-generic mostlyclean \
+	mostlyclean-compile mostlyclean-generic mostlyclean-libtool \
+	pdf pdf-am ps ps-am tags uninstall uninstall-am \
+	uninstall-includeHEADERS uninstall-info-am \
+	uninstall-libLTLIBRARIES
+
+# Tell versions [3.59,3.63) of GNU make to not export all variables.
+# Otherwise a system limit (for SysV at least) may be exceeded.
+.NOEXPORT:

Added: incubator/activemq/trunk/amazon/amq_corelib/Message.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/amazon/amq_corelib/Message.h?rev=427057&view=auto
==============================================================================
--- incubator/activemq/trunk/amazon/amq_corelib/Message.h (added)
+++ incubator/activemq/trunk/amazon/amq_corelib/Message.h Mon Jul 31 02:36:40 2006
@@ -0,0 +1,104 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+*/
+
+#ifndef ACTIVEMQ_MESSAGE_H
+#define ACTIVEMQ_MESSAGE_H
+
+#include <vector>
+
+#include <time.h>
+
+#include "Buffer.h"
+#include "Destination.h"
+
+#include "command/CommandTypes.h"
+
+namespace ActiveMQ {
+    /// Represents an ActiveMQ message
+    /**
+       Contains an ActiveMQ message.  The specific types
+       (ObjectMessage, BytesMessage, etc) are subclasses.
+
+       @version $Id$
+    */
+    class Message {
+    protected:
+        Message() {}
+    public:
+        /// gets the type
+        /**
+           Gets the integer type of this message.
+
+           @returns the type
+        */
+        virtual int getType() const = 0;
+
+        /// gets the destination
+        /**
+           Gets the Destination that this message was/will be sent to.
+
+           @returns destination of this message
+        */
+        const Destination& getDestination() const { return destination_; }
+
+        /// sets the destination
+        void setDestination(const Destination& d) { destination_ = d; }
+
+        /// gets the reply-to destination
+        /**
+           Gets the "reply-to" Destination for this Message.
+
+           @returns the Destination for replies to this Message.
+        */
+        const Destination& getReplyTo() const { return replyTo_; }
+
+        /// sets the reply-to destination
+        /**
+           Sets the "reply-to" Destination for this Message.
+
+           @param dest the destination to ask for replies to this message on
+        */
+        void setReplyTo(const Destination& dest) { replyTo_ = dest; }
+
+        /// gets the time received
+        /**
+           Gets the time (in seconds since the epoch) that this
+           Message was received.
+
+           @returns time message was received
+        */
+        time_t getTimestamp() const;
+
+        /// gets the underlying bytes
+        /**
+           Gets this Message's payload as an array of bytes.
+
+           @returns message data
+        */
+        virtual void marshall(Buffer& fillIn) const = 0;
+
+        /// Destructor
+        virtual ~Message() {};
+    private:
+        Destination destination_;
+        Destination replyTo_;
+    };
+};
+
+#endif // ACTIVEMQ_MESSAGE_H

Propchange: incubator/activemq/trunk/amazon/amq_corelib/Message.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/amazon/amq_corelib/Message.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/amazon/amq_corelib/MessageConsumer.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/amazon/amq_corelib/MessageConsumer.cpp?rev=427057&view=auto
==============================================================================
--- incubator/activemq/trunk/amazon/amq_corelib/MessageConsumer.cpp (added)
+++ incubator/activemq/trunk/amazon/amq_corelib/MessageConsumer.cpp Mon Jul 31 02:36:40 2006
@@ -0,0 +1,38 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+*/
+
+#include <assert.h>
+
+#include "MessageConsumer.h"
+#include "Message.h"
+#include "RCSID.h"
+
+using namespace ActiveMQ;
+
+RCSID(MessageConsumer, "$Id$");
+
+MessageConsumer::HasDest::HasDest(const Destination& d)
+    : dest_(d)
+  {}
+
+bool
+MessageConsumer::HasDest::operator()(const Message *m) {
+    assert(m != NULL);
+    return m->getDestination() == dest_;
+}

Propchange: incubator/activemq/trunk/amazon/amq_corelib/MessageConsumer.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/amazon/amq_corelib/MessageConsumer.cpp
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/amazon/amq_corelib/MessageConsumer.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/amazon/amq_corelib/MessageConsumer.h?rev=427057&view=auto
==============================================================================
--- incubator/activemq/trunk/amazon/amq_corelib/MessageConsumer.h (added)
+++ incubator/activemq/trunk/amazon/amq_corelib/MessageConsumer.h Mon Jul 31 02:36:40 2006
@@ -0,0 +1,84 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+*/
+
+#ifndef ACTIVEMQ_MSGCONSUMER_H
+#define ACTIVEMQ_MSGCONSUMER_H
+
+#include <memory>
+
+namespace ActiveMQ {
+    class Message;
+    class CoreLib;
+    class Destination;
+
+    /// Base class for MessageConsumers
+    /**
+       This class provides the common interface for MessageConsumers,
+       which are objects that allow the application to receive
+       messages.  See the documentation for
+       ActiveMQ::NonBlockingMessageConsumer and
+       ActiveMQ::BlockingMessageConsumer.
+
+       Note that the application never actually deals with one of
+       these directly - the application receives MessageConsumerRef
+       objects - weak, counted references to MessageConsumers.
+
+       @version $Id$
+    */
+    class MessageConsumer {
+    public:
+        /// gets the number of messages that are ready
+        /**
+           Gets the number of waiting messages.
+
+           @returns the number
+        */
+        virtual unsigned int getNumReadyMessages() const = 0;
+
+        /// receives a message
+        /**
+           Pulls a message from the internal queue.  It is returned as
+           a std::auto_ptr to make the ownership policy clear.
+
+           @returns the new message
+        */
+        virtual std::auto_ptr<Message> receive() = 0;
+
+    protected:
+        friend class CoreLibImpl;
+        MessageConsumer() {}
+        virtual ~MessageConsumer() {};
+        virtual void enqueue(Message *msg) = 0;
+        virtual void removeQueued(const Destination& dest) = 0;
+
+        class HasDest {
+        public:
+            HasDest(const Destination& d);
+            bool operator()(const Message *m);
+        private:
+            const Destination &dest_;
+        };
+
+    private:
+        MessageConsumer(const MessageConsumer& oth);
+        MessageConsumer &operator=(const MessageConsumer& oth);
+    };
+};
+
+#endif // ACTIVEMQ_MSGCONSUMER_H

Propchange: incubator/activemq/trunk/amazon/amq_corelib/MessageConsumer.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/amazon/amq_corelib/MessageConsumer.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL



Mime
View raw message