activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1418089 - in /activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp: AmqpProtocolConverter.java AmqpTransport.java
Date Thu, 06 Dec 2012 21:22:22 GMT
Author: chirino
Date: Thu Dec  6 21:22:21 2012
New Revision: 1418089

URL: http://svn.apache.org/viewvc?rev=1418089&view=rev
Log:
AMQP: Clean up old commented code, pick up the trace setting from the transport filter.

Modified:
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java?rev=1418089&r1=1418088&r2=1418089&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
(original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
Thu Dec  6 21:22:21 2012
@@ -65,39 +65,16 @@ class AmqpProtocolConverter {
     private static final UnsignedInteger DURABLE = new UnsignedInteger(2);
     private static final Symbol DURABLE_SUBSCRIPTION_ENDED = Symbol.getSymbol("DURABLE_SUBSCRIPTION_ENDED");
 
-    public AmqpProtocolConverter(AmqpTransport amqpTransport, BrokerContext brokerContext)
{
-        this.amqpTransport = amqpTransport;
-    }
-
-    ReentrantLock lock = new ReentrantLock();
-
-//
-//    private static final Buffer PING_RESP_FRAME = new PINGRESP().encode();
-//
-//
-//    private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
-//    private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
-//
-//    private final ConcurrentHashMap<ConsumerId, AmqpSubscription> subscriptionsByConsumerId
= new ConcurrentHashMap<ConsumerId, AmqpSubscription>();
-//    private final ConcurrentHashMap<UTF8Buffer, AmqpSubscription> amqpSubscriptionByTopic
= new ConcurrentHashMap<UTF8Buffer, AmqpSubscription>();
-//    private final Map<UTF8Buffer, ActiveMQTopic> activeMQTopicMap = new LRUCache<UTF8Buffer,
ActiveMQTopic>();
-//    private final Map<Destination, UTF8Buffer> amqpTopicMap = new LRUCache<Destination,
UTF8Buffer>();
-//    private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>();
-//    private final Map<Short, PUBREC> publisherRecs = new LRUCache<Short, PUBREC>();
-//
-//    private final AtomicBoolean connected = new AtomicBoolean(false);
-//    private CONNECT connect;
-//    private String clientId;
-//    private final String QOS_PROPERTY_NAME = "QoSPropertyName";
     int prefetch = 100;
-    boolean trace = true;
 
+    ReentrantLock lock = new ReentrantLock();
     TransportImpl protonTransport = new TransportImpl();
     ConnectionImpl protonConnection = new ConnectionImpl();
 
-    {
+    public AmqpProtocolConverter(AmqpTransport transport, BrokerContext brokerContext) {
+        this.amqpTransport = transport;
         this.protonTransport.bind(this.protonConnection);
-        if( trace ) {
+        if( transport.isTrace() ) {
             this.protonTransport.setProtocolTracer(new ProtocolTracer() {
                 @Override
                 public void receivedFrame(TransportFrame transportFrame) {
@@ -351,18 +328,6 @@ class AmqpProtocolConverter {
             connectionInfo.setClientId(clientId);
         }
 
-
-//        String userName = "";
-//        if (connect.userName() != null) {
-//            userName = connect.userName().toString();
-//        }
-//        String passswd = "";
-//        if (connect.password() != null) {
-//            passswd = connect.password().toString();
-//        }
-//        connectionInfo.setUserName(userName);
-//        connectionInfo.setPassword(passswd);
-
         connectionInfo.setTransportContext(amqpTransport.getPeerCertificates());
 
         sendToActiveMQ(connectionInfo, new ResponseHandler() {
@@ -1020,182 +985,6 @@ class AmqpProtocolConverter {
         return rc;
     }
 
-//    void onUnSubscribe(UNSUBSCRIBE command) {
-//        UTF8Buffer[] topics = command.topics();
-//        if (topics != null) {
-//            for (int i = 0; i < topics.length; i++) {
-//                onUnSubscribe(topics[i]);
-//            }
-//        }
-//        UNSUBACK ack = new UNSUBACK();
-//        ack.messageId(command.messageId());
-//        pumpOut(ack.encode());
-//
-//    }
-//
-//    void onUnSubscribe(UTF8Buffer topicName) {
-//        AmqpSubscription subs = amqpSubscriptionByTopic.remove(topicName);
-//        if (subs != null) {
-//            ConsumerInfo info = subs.getConsumerInfo();
-//            if (info != null) {
-//                subscriptionsByConsumerId.remove(info.getConsumerId());
-//            }
-//            RemoveInfo removeInfo = info.createRemoveCommand();
-//            sendToActiveMQ(removeInfo, null);
-//        }
-//    }
-//
-//
-//    /**
-//     * Dispatch a ActiveMQ command
-//     */
-//
-//
-//
-//    void onAMQPPublish(PUBLISH command) throws IOException, JMSException {
-//        checkConnected();
-//    }
-//
-//    void onAMQPPubAck(PUBACK command) {
-//        short messageId = command.messageId();
-//        MessageAck ack;
-//        synchronized (consumerAcks) {
-//            ack = consumerAcks.remove(messageId);
-//        }
-//        if (ack != null) {
-//            amqpTransport.sendToActiveMQ(ack);
-//        }
-//    }
-//
-//    void onAMQPPubRec(PUBREC commnand) {
-//        //from a subscriber - send a PUBREL in response
-//        PUBREL pubrel = new PUBREL();
-//        pubrel.messageId(commnand.messageId());
-//        pumpOut(pubrel.encode());
-//    }
-//
-//    void onAMQPPubRel(PUBREL command) {
-//        PUBREC ack;
-//        synchronized (publisherRecs) {
-//            ack = publisherRecs.remove(command.messageId());
-//        }
-//        if (ack == null) {
-//            LOG.warn("Unknown PUBREL: " + command.messageId() + " received");
-//        }
-//        PUBCOMP pubcomp = new PUBCOMP();
-//        pubcomp.messageId(command.messageId());
-//        pumpOut(pubcomp.encode());
-//    }
-//
-//    void onAMQPPubComp(PUBCOMP command) {
-//        short messageId = command.messageId();
-//        MessageAck ack;
-//        synchronized (consumerAcks) {
-//            ack = consumerAcks.remove(messageId);
-//        }
-//        if (ack != null) {
-//            amqpTransport.sendToActiveMQ(ack);
-//        }
-//    }
-//
-//
-//
-//
-//    public AmqpTransport amqpTransport {
-//        return amqpTransport;
-//    }
-//
-//
-//
-//    void configureInactivityMonitor(short heartBeat) {
-//        try {
-//
-//            int heartBeatMS = heartBeat * 1000;
-//            AmqpInactivityMonitor monitor = amqpTransport.getInactivityMonitor();
-//            monitor.setProtocolConverter(this);
-//            monitor.setReadCheckTime(heartBeatMS);
-//            monitor.setInitialDelayTime(heartBeatMS);
-//            monitor.startMonitorThread();
-//
-//        } catch (Exception ex) {
-//            LOG.warn("Failed to start AMQP InactivityMonitor ", ex);
-//        }
-//
-//        LOG.debug(getClientId() + " AMQP Connection using heart beat of  " + heartBeat
+ " secs");
-//    }
-//
-//
-//
-//    void checkConnected() throws AmqpProtocolException {
-//        if (!connected.get()) {
-//            throw new AmqpProtocolException("Not connected.");
-//        }
-//    }
-//
-//    private String getClientId() {
-//        if (clientId == null) {
-//            if (connect != null && connect.clientId() != null) {
-//                clientId = connect.clientId().toString();
-//            }
-//        } else {
-//            clientId = "";
-//        }
-//        return clientId;
-//    }
-//
-//    private void stopTransport() {
-//        try {
-//            amqpTransport.stop();
-//        } catch (Throwable e) {
-//            LOG.debug("Failed to stop AMQP transport ", e);
-//        }
-//    }
-//
-//    ResponseHandler createResponseHandler(final PUBLISH command) {
-//
-//        if (command != null) {
-//            switch (command.qos()) {
-//                case AT_LEAST_ONCE:
-//                    return new ResponseHandler() {
-//                        public void onResponse(AmqpProtocolConverter converter, Response
response) throws IOException {
-//                            if (response.isException()) {
-//                                LOG.warn("Failed to send AMQP Publish: ", command, ((ExceptionResponse)
response).getException());
-//                            } else {
-//                                PUBACK ack = new PUBACK();
-//                                ack.messageId(command.messageId());
-//                                converter.amqpTransport.sendToAmqp(ack.encode());
-//                            }
-//                        }
-//                    };
-//                case EXACTLY_ONCE:
-//                    return new ResponseHandler() {
-//                        public void onResponse(AmqpProtocolConverter converter, Response
response) throws IOException {
-//                            if (response.isException()) {
-//                                LOG.warn("Failed to send AMQP Publish: ", command, ((ExceptionResponse)
response).getException());
-//                            } else {
-//                                PUBREC ack = new PUBREC();
-//                                ack.messageId(command.messageId());
-//                                synchronized (publisherRecs) {
-//                                    publisherRecs.put(command.messageId(), ack);
-//                                }
-//                                converter.amqpTransport.sendToAmqp(ack.encode());
-//                            }
-//                        }
-//                    };
-//                case AT_MOST_ONCE:
-//                    break;
-//            }
-//        }
-//        return null;
-//    }
-//
-//    private String convertAMQPToActiveMQ(String name) {
-//        String result = name.replace('#', '>');
-//        result = result.replace('+', '*');
-//        result = result.replace('/', '.');
-//        return result;
-//    }
-
     ////////////////////////////////////////////////////////////////////////////
     //
     // Implementation methods

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java?rev=1418089&r1=1418088&r2=1418089&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java
(original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java
Thu Dec  6 21:22:21 2012
@@ -45,4 +45,6 @@ public interface AmqpTransport {
 
     public String getRemoteAddress();
 
+    public boolean isTrace();
+
 }



Mime
View raw message