Return-Path: Delivered-To: apmail-incubator-qpid-commits-archive@locus.apache.org Received: (qmail 20627 invoked from network); 9 Aug 2007 07:24:40 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 9 Aug 2007 07:24:40 -0000 Received: (qmail 20435 invoked by uid 500); 9 Aug 2007 07:24:39 -0000 Delivered-To: apmail-incubator-qpid-commits-archive@incubator.apache.org Received: (qmail 20397 invoked by uid 500); 9 Aug 2007 07:24:39 -0000 Mailing-List: contact qpid-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: qpid-dev@incubator.apache.org Delivered-To: mailing list qpid-commits@incubator.apache.org Received: (qmail 20387 invoked by uid 99); 9 Aug 2007 07:24:39 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Aug 2007 00:24:39 -0700 X-ASF-Spam-Status: No, hits=-100.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Aug 2007 07:24:38 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id BF7DB1A981C; Thu, 9 Aug 2007 00:24:17 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r564124 [1/2] - in /incubator/qpid/trunk/qpid/java: client/src/main/java/org/apache/qpidity/client/ client/src/main/java/org/apache/qpidity/impl/ client/src/main/java/org/apache/qpidity/jms/ common/src/main/java/org/apache/qpidity/ common/s... Date: Thu, 09 Aug 2007 07:24:04 -0000 To: qpid-commits@incubator.apache.org From: rajith@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070809072417.BF7DB1A981C@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: rajith Date: Thu Aug 9 00:24:02 2007 New Revision: 564124 URL: http://svn.apache.org/viewvc?view=rev&rev=564124 Log: implemented the Connection and Session API Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/impl/Client.java incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/impl/DemoClient.java incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/QpidConfig.java incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SecurityHelper.java incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/security/ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/security/AMQPCallbackHandler.java incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/security/CallbackHandlerRegistry.java incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/security/DynamicSaslRegistrar.java incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/security/JCAProvider.java incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/security/UsernamePasswordCallbackHandler.java incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/security/amqplain/ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/security/amqplain/AmqPlainSaslClient.java incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/security/amqplain/AmqPlainSaslClientFactory.java Removed: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/CommonSessionDelegate.java Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Connection.java incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Channel.java incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Connection.java incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Frame.java incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/MinaHandler.java incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/QpidException.java incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Session.java incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/api/Message.java incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/filter/PropertyExpression.java Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Connection.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Connection.java?view=diff&rev=564124&r1=564123&r2=564124 ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Connection.java (original) +++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Connection.java Thu Aug 9 00:24:02 2007 @@ -28,6 +28,17 @@ */ public interface Connection { + /** + * Establish the connection using the given parameters + * + * @param host + * @param port + * @param username + * @param password + * @throws QpidException + */ + public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException; + /** * Establish the connection with the broker identified by the provided URL. * @@ -52,7 +63,7 @@ * @param expiryInSeconds Expiry time expressed in seconds, if the value is <= 0 then the session does not expire. * @return A Newly created (suspended) session. */ - public Session createSession(int expiryInSeconds); + public Session createSession(long expiryInSeconds); /** * Create a DtxSession for this connection. Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java?view=diff&rev=564124&r1=564123&r2=564124 ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java (original) +++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java Thu Aug 9 00:24:02 2007 @@ -17,7 +17,9 @@ */ package org.apache.qpidity.client; -import org.apache.qpidity.Header; +import java.nio.ByteBuffer; + +import org.apache.qpidity.Struct; /** * Assembles message parts. @@ -31,21 +33,21 @@ * are transferred. */ public interface MessagePartListener -{ +{ /** * Add the following headers ( {@link org.apache.qpidity.DeliveryProperties} * or {@link org.apache.qpidity.ApplicationProperties} ) to the message being received. * * @param headers Either DeliveryProperties or ApplicationProperties */ - public void messageHeaders(Header... headers); + public void messageHeaders(Struct... headers); /** * Add the following byte array to the content of the message being received * * @param data Data to be added or streamed. */ - public void addData(byte[] data); + public void addData(ByteBuffer src); /** * Indicates that the message has been fully received. Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java?view=diff&rev=564124&r1=564123&r2=564124 ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java (original) +++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java Thu Aug 9 00:24:02 2007 @@ -18,12 +18,14 @@ */ package org.apache.qpidity.client; +import java.nio.ByteBuffer; import java.util.Map; +import java.util.UUID; -import org.apache.qpidity.api.Message; -import org.apache.qpidity.Header; import org.apache.qpidity.Option; import org.apache.qpidity.RangeSet; +import org.apache.qpidity.Struct; +import org.apache.qpidity.api.Message; /** *

A session is associated with a connection. @@ -32,10 +34,12 @@ */ public interface Session { - public static final short ACQUIRE_MODE_NO_ACQUIRE = 0; - public static final short ACQUIRE_MODE_PRE_ACQUIRE = 1; - public static final short CONFIRM_MODE_REQUIRED = 1; - public static final short CONFIRM_MODE_NOT_REQUIRED = 0; + public static final short ACQUIRE_ANY_AVAILABLE_MESSAGE = 0; + public static final short ACQUIRE_MESSAGES_IF_ALL_ARE_AVAILABLE = 0; + public static final short TRANSFER_ACQUIRE_MODE_NO_ACQUIRE = 0; + public static final short TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE = 1; + public static final short TRANSFER_CONFIRM_MODE_REQUIRED = 1; + public static final short TRANSFER_CONFIRM_MODE_NOT_REQUIRED = 0; public static final short MESSAGE_FLOW_MODE_CREDIT = 0; public static final short MESSAGE_FLOW_MODE_WINDOW = 1; public static final short MESSAGE_FLOW_UNIT_MESSAGE = 0; @@ -54,21 +58,21 @@ /** * Close this session and any associated resources. */ - public void close(); + public void sessionClose(); /** * Suspend this session resulting in interrupting the traffic with the broker. *

The session timer will start to tick in suspend. *

When a session is suspend any operation of this session and of the associated resources are unavailable. */ - public void suspend(); + public void sessionSuspend(); /** * This will resume an existing session *

Upon resume the session is attached with an underlying channel * hence making operation on this session available. */ - public void resume(); + public void sessionResume(UUID sessionId); //------------------------------------------------------ // Messaging methods @@ -92,7 +96,7 @@ * @param exchange The exchange the message is being sent. * @param msg The Message to be sent */ - public void messageTransfer(String exchange, Message msg, short confirmMode, short acquireMode); + public void messageTransfer(String destination, Message msg, short confirmMode, short acquireMode); /** * Declare the beginning of a message transfer operation. This operation must @@ -117,27 +121,39 @@ * * @param exchange The exchange the message is being sent. */ - public void messageTransfer(String exchange, short confirmMode, short acquireMode); + public void messageTransfer(String destination, short confirmMode, short acquireMode); /** * Add the following headers ( {@link org.apache.qpidity.DeliveryProperties} * or to the message being sent. * - * @param headers Either DeliveryProperties or ApplicationProperties + * @param headers are Either DeliveryProperties or ApplicationProperties * @see org.apache.qpidity.DeliveryProperties */ - public void addMessageHeaders(Header... headers); + public void headers(Struct... headers); /** * Add the following byte array to the content of the message being sent. * * @param data Data to be added. - * @param off Offset from which to start reading data - * @param len Number of bytes to be read */ - public void addData(byte[] data, int off, int len); + public void data(byte[] data); + + /** + * Add the following ByteBuffer to the content of the message being sent. + * + * @param data Data to be added. + */ + public void data(ByteBuffer buf); /** + * Add the following String to the content of the message being sent. + * + * @param data Data to be added. + */ + public void data(String str); + + /** * Signals the end of data for the message. */ public void endData(); @@ -258,8 +274,6 @@ * @param destination The destination to call flush on. */ public void messageFlush(String destination); - - public int getNoOfUnAckedMessages(); /** * On receipt of this method, the brokers MUST set his credit to zero for the given @@ -286,8 +300,12 @@ * and may be either discarded or moved to the broker dead letter queue. * * @param ranges Range of rejected messages. + * @param code TODO + * @param text TODO */ - public void messageReject(RangeSet ranges); + public void messageReject(RangeSet ranges, int code, String text); + + public RangeSet getRejectedMessages(); /** * Try to acquire ranges of messages hence releasing them form the queue. @@ -296,10 +314,10 @@ * message acquisition can fail. * The outcome of the acquisition is returned as an array of ranges of qcquired messages. *

This method should only be called on non-acquired messages. - * + * @param mode TODO * @param range Ranges of messages to be acquired. */ - public void messageAcquire(RangeSet ranges); + public void messageAcquire(RangeSet ranges, short mode); public RangeSet getAccquiredMessages(); Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/impl/Client.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/impl/Client.java?view=auto&rev=564124 ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/impl/Client.java (added) +++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/impl/Client.java Thu Aug 9 00:24:02 2007 @@ -0,0 +1,112 @@ +package org.apache.qpidity.impl; + +import java.net.URL; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.qpidity.Channel; +import org.apache.qpidity.Connection; +import org.apache.qpidity.ConnectionClose; +import org.apache.qpidity.ConnectionDelegate; +import org.apache.qpidity.MinaHandler; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.SessionDelegate; +import org.apache.qpidity.client.DtxSession; +import org.apache.qpidity.client.ExceptionListener; +import org.apache.qpidity.client.Session; + + +public class Client implements org.apache.qpidity.client.Connection +{ + private AtomicInteger _channelNo = new AtomicInteger(); + private Connection _conn; + private ExceptionListener _exceptionListner; + private final Lock _lock = new ReentrantLock(); + + public static org.apache.qpidity.client.Connection createConnection() + { + return new Client(); + } + + public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException + { + Condition negotiationComplete = _lock.newCondition(); + _lock.lock(); + + ConnectionDelegate connectionDelegate = new ConnectionDelegate() + { + public SessionDelegate getSessionDelegate() + { + return new ClientSessionDelegate(); + } + + @Override public void connectionClose(Channel context, ConnectionClose struct) + { + _exceptionListner.onException(new QpidException("Server closed the connection: Reason " + struct.getReplyText(),struct.getReplyCode(),null)); + } + }; + + connectionDelegate.setCondition(_lock,negotiationComplete); + connectionDelegate.setUsername(username); + connectionDelegate.setPassword(password); + connectionDelegate.setVirtualHost(virtualHost); + + _conn = MinaHandler.connect(host, port,connectionDelegate); + + _conn.getOutputHandler().handle(_conn.getHeader().toByteBuffer()); + + try + { + negotiationComplete.await(); + } + catch (Exception e) + { + // + } + finally + { + _lock.unlock(); + } + } + + /* + * Until the dust settles with the URL disucssion + * I am not going to implement this. + */ + public void connect(URL url) throws QpidException + { + throw new UnsupportedOperationException(); + } + + public void close() throws QpidException + { + Channel ch = _conn.getChannel(0); + ch.connectionClose(0, "client is closing", 0, 0); + //need to close the connection underneath as well + } + + public Session createSession(long expiryInSeconds) + { + Channel ch = _conn.getChannel(_channelNo.incrementAndGet()); + ClientSession ssn = new ClientSession(); + ssn.attach(ch); + ssn.sessionOpen(expiryInSeconds); + + return ssn; + } + + public DtxSession createDTXSession(int expiryInSeconds) + { + // TODO Auto-generated method stub + return null; + } + + public void setExceptionListener(ExceptionListener exceptionListner) + { + _exceptionListner = exceptionListner; + } + +} Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java?view=diff&rev=564124&r1=564123&r2=564124 ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java (original) +++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java Thu Aug 9 00:24:02 2007 @@ -1,216 +1,101 @@ package org.apache.qpidity.impl; import java.util.HashMap; +import java.util.List; import java.util.Map; +import org.apache.qpidity.Option; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.Range; +import org.apache.qpidity.RangeSet; import org.apache.qpidity.api.Message; import org.apache.qpidity.client.ExceptionListener; import org.apache.qpidity.client.MessagePartListener; -import org.apache.qpidity.*; /** * Implements a Qpid Sesion. */ -public class ClientSession implements org.apache.qpidity.client.Session +public class ClientSession extends org.apache.qpidity.Session implements org.apache.qpidity.client.Session { - - Map messagListeners = new HashMap(); - - public void addData(byte[] data, int off, int len) - { - // TODO Auto-generated method stub - - } - - public void addMessageHeaders(Header... headers) - { - // TODO Auto-generated method stub - - } - - public void close() - { - // TODO Auto-generated method stub - - } - - public void endData() - { - // TODO Auto-generated method stub - - } - - public void exchangeDeclare(String exchangeName, String exchangeClass, String alternateExchange, Map arguments, Option... options) - { - // TODO Auto-generated method stub - - } - - public void exchangeDelete(String exchangeName, Option... options) + private Map _messageListeners = new HashMap(); + private ExceptionListener _exceptionListner; + private RangeSet _acquiredMessages; + private RangeSet _rejectedMessages; + private Map> _unackedMessages = new HashMap>(); + + @Override public void sessionClose() { - // TODO Auto-generated method stub - + // release all unacked messages and then issues a close + super.sessionClose(); } - + public void messageAcknowledge(RangeSet ranges) { - // TODO Auto-generated method stub - - } - - public void messageAcquire(RangeSet ranges) - { - // TODO Auto-generated method stub - } - - public void messageCancel(String destination) - { - // TODO Auto-generated method stub - - } - - public void messageFlow(String destination, short unit, long value) - { - // TODO Auto-generated method stub - - } - - public void messageFlowMode(String destination, short mode) - { - // TODO Auto-generated method stub - - } - - public void messageFlush(String destination) - { - // TODO Auto-generated method stub - } - - public void messageReject(RangeSet ranges) - { - // TODO Auto-generated method stub - - } - - public void messageRelease(RangeSet ranges) - { - // TODO Auto-generated method stub - - } - - public void messageStop(String destination) - { - // TODO Auto-generated method stub - + for (Range range : ranges) + { + for (long l = range.getLower(); l <= range.getUpper(); l++) + { + System.out.println("Acknowleding message for : " + super.getCommand((int) l)); + super.processed(l); + } + } } public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode, MessagePartListener listener, Map filter, Option... options) { - // TODO Auto-generated method stub - + setMessageListener(destination,listener); + super.messageSubscribe(queue, destination, confirmMode, acquireMode, filter, options); } public void messageTransfer(String exchange, Message msg, short confirmMode, short acquireMode) { - // TODO Auto-generated method stub - + // need to break it down into small pieces + super.messageTransfer(exchange, confirmMode, acquireMode); + super.headers(msg.getDeliveryProperties(),msg.getMessageProperties()); + // super.data(bytes); * + // super.endData() } - - public void messageTransfer(String exchange, short confirmMode, short acquireMode) - { - // TODO Auto-generated method stub - - } - - public void queueBind(String queueName, String exchangeName, String routingKey, Map arguments) - { - // TODO Auto-generated method stub - - } - - public void queueDeclare(String queueName, String alternateExchange, Map arguments, Option... options) - { - // TODO Auto-generated method stub - - } - - public void queueDelete(String queueName, Option... options) - { - // TODO Auto-generated method stub - - } - - public void queuePurge(String queueName) - { - // TODO Auto-generated method stub - - } - - public void queueUnbind(String queueName, String exchangeName, String routingKey, Map arguments) - { - // TODO Auto-generated method stub - - } - - public void resume() + + + public RangeSet getAccquiredMessages() { - // TODO Auto-generated method stub - + return _acquiredMessages; } - public void setExceptionListener(ExceptionListener exceptionListner) + public RangeSet getRejectedMessages() { - // TODO Auto-generated method stub - + return _rejectedMessages; } - + public void setMessageListener(String destination, MessagePartListener listener) { - // TODO Auto-generated method stub - + _messageListeners.put(destination, listener); } - - public void suspend() - { - // TODO Auto-generated method stub - - } - - public void sync() - { - // TODO Auto-generated method stub - - } - - public void txCommit() throws IllegalStateException + + public void setExceptionListener(ExceptionListener exceptionListner) { - // TODO Auto-generated method stub - - } - - public void txRollback() throws IllegalStateException + _exceptionListner = exceptionListner; + } + + // ugly but nessacery + + void setAccquiredMessages(RangeSet acquiredMessages) { - // TODO Auto-generated method stub - + _acquiredMessages = acquiredMessages; } - - public void txSelect() + + void setRejectedMessages(RangeSet rejectedMessages) { - // TODO Auto-generated method stub - + _rejectedMessages = rejectedMessages; } - - public RangeSet getAccquiredMessages() + + void notifyException(QpidException ex) { - // TODO Auto-generated method stub - return null; + _exceptionListner.onException(ex); } - - public int getNoOfUnAckedMessages() + + Map getMessageListerners() { - // TODO Auto-generated method stub - return 0; + return _messageListeners; } - - } Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java?view=diff&rev=564124&r1=564123&r2=564124 ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java (original) +++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java Thu Aug 9 00:24:02 2007 @@ -1,45 +1,79 @@ package org.apache.qpidity.impl; -import org.apache.qpidity.CommonSessionDelegate; -import org.apache.qpidity.client.Session; +import java.nio.ByteBuffer; - -public class ClientSessionDelegate extends CommonSessionDelegate -{ - - /*@Override public void messageTransfer(Session context, MessageTransfer struct) - { - MessagePartListener l = context.messagListeners.get(struct.getDestination()); - l.messageTransfer(struct.getDestination(),new Option[0]); - }*/ - - // --------------------------------------------------------------- - // Non generated methods - but would like if they are also generated. - // These methods should be called from Body and Header Handlers. - // If these methods are generated as part of the delegate then - // I can call these methods from the BodyHandler and HeaderHandler - // in a generic way - - // I have used destination to indicate my intent of receiving - // some form of correlation to know which consumer this data belongs to. - // It can be anything as long as I can make the right correlation - // ---------------------------------------------------------------- - /* public void data(Session context,String destination,byte[] src) throws QpidException +import org.apache.qpidity.Frame; +import org.apache.qpidity.MessageAcquired; +import org.apache.qpidity.MessageReject; +import org.apache.qpidity.MessageTransfer; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.Range; +import org.apache.qpidity.RangeSet; +import org.apache.qpidity.Session; +import org.apache.qpidity.SessionDelegate; +import org.apache.qpidity.Struct; +import org.apache.qpidity.client.MessagePartListener; + + +public class ClientSessionDelegate extends SessionDelegate +{ + private MessageTransfer _currentTransfer; + private MessagePartListener _currentMessageListener; + + @Override public void data(Session ssn, Frame frame) { - MessagePartListener l = context.messagListeners.get(destination); - l.data(src); + for (ByteBuffer b : frame) + { + _currentMessageListener.addData(b); + } + if (frame.isLastSegment() && frame.isLastFrame()) + { + _currentMessageListener.messageReceived(); + } + } - public void endData(Session context,String destination) throws QpidException + @Override public void headers(Session ssn, Struct... headers) { - MessagePartListener l = context.messagListeners.get(destination); - l.endData(); + _currentMessageListener.messageHeaders(headers); } - public void messageHeaders(Session context,String destination,Header... headers) throws QpidException - { - MessagePartListener l = context.messagListeners.get(destination); - l.endData(); - }*/ + @Override public void messageTransfer(Session session, MessageTransfer currentTransfer) + { + _currentTransfer = currentTransfer; + _currentMessageListener = ((ClientSession)session).getMessageListerners().get(currentTransfer.getDestination()); + + //a better way is to tell the broker to stop the transfer + if (_currentMessageListener == null && _currentTransfer.getAcquireMode() == 1) + { + RangeSet transfers = new RangeSet(); + transfers.add(_currentTransfer.getId()); + session.messageRelease(transfers); + } + } + + // -------------------------------------------- + // Message methods + // -------------------------------------------- + + + @Override public void messageReject(Session session, MessageReject struct) + { + for (Range range : struct.getTransfers()) + { + for (long l = range.getLower(); l <= range.getUpper(); l++) + { + System.out.println("message rejected: " + + session.getCommand((int) l)); + } + } + ((ClientSession)session).setRejectedMessages(struct.getTransfers()); + ((ClientSession)session).notifyException(new QpidException("Message Rejected",0,null)); + } + + @Override public void messageAcquired(Session session, MessageAcquired struct) + { + ((ClientSession)session).setAccquiredMessages(struct.getTransfers()); + } } Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/impl/DemoClient.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/impl/DemoClient.java?view=auto&rev=564124 ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/impl/DemoClient.java (added) +++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/impl/DemoClient.java Thu Aug 9 00:24:02 2007 @@ -0,0 +1,45 @@ +package org.apache.qpidity.impl; + +import org.apache.qpidity.DeliveryProperties; +import org.apache.qpidity.MessageProperties; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.client.ExceptionListener; +import org.apache.qpidity.client.Session; +import org.apache.qpidity.client.Connection; + +public class DemoClient +{ + + public static final void main(String[] args) + { + Connection conn = Client.createConnection(); + try{ + conn.connect("0.0.0.0", 5672, "test", "guest", "guest"); + }catch(Exception e){ + e.printStackTrace(); + } + + Session ssn = conn.createSession(50000); + ssn.setExceptionListener(new ExceptionListener() + { + public void onException(QpidException e) + { + System.out.println(e); + } + }); + ssn.queueDeclare("Queue1", null, null); + ssn.sync(); + + ssn.messageTransfer("Queue1", (short) 0, (short) 1); + ssn.headers(new DeliveryProperties(), + new MessageProperties()); + ssn.data("this is the data"); + ssn.endData(); + + ssn.messageTransfer("Queue2", (short) 0, (short) 1); + ssn.data("this should be rejected"); + ssn.endData(); + ssn.sync(); + } + +} Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java?view=diff&rev=564124&r1=564123&r2=564124 ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java (original) +++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java Thu Aug 9 00:24:02 2007 @@ -1,33 +1,109 @@ package org.apache.qpidity.impl; -import org.apache.qpidity.Header; +import java.nio.ByteBuffer; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; + +import org.apache.qpidity.DeliveryProperties; +import org.apache.qpidity.MessageProperties; +import org.apache.qpidity.Struct; import org.apache.qpidity.api.Message; import org.apache.qpidity.client.MessageListener; import org.apache.qpidity.client.MessagePartListener; +/** + * + * Will call onMessage method as soon as data is avialable + * The client can then start to process the data while + * the rest of the data is read. + * + */ public class MessagePartListenerAdapter implements MessagePartListener { MessageListener _adaptee; Message _currentMsg; + DeliveryProperties _currentDeliveryProps; + MessageProperties _currentMessageProps; public MessagePartListenerAdapter(MessageListener listener) { _adaptee = listener; - _currentMsg = null; - } + + // temp solution. + _currentMsg = new Message() + { + Queue _data = new LinkedList(); + ByteBuffer _readBuffer; + private int dataSize; + + public void appendData(byte[] src) + { + appendData(ByteBuffer.wrap(src)); + } - public void addData(byte[] src) - { - _currentMsg.appendData(src); - } + public void appendData(ByteBuffer src) + { + _data.offer(src); + dataSize += src.remaining(); + } + + public DeliveryProperties getDeliveryProperties() + { + return _currentDeliveryProps; + } + + public MessageProperties getMessageProperties() + { + return _currentMessageProps; + } - public void messageHeaders(Header... headers) + // since we provide the message only after completion + // we can assume that when this method is called we have + // received all data. + public void readData(byte[] target) + { + if (_readBuffer == null) + { + buildReadBuffer(); + } + + _readBuffer.get(target); + } + + private void buildReadBuffer() + { + _readBuffer = ByteBuffer.allocate(dataSize); + for(ByteBuffer buf:_data) + { + _readBuffer.put(buf); + } + } + }; + } + + public void addData(ByteBuffer src) + { + _currentMsg.appendData(src); + } + + public void messageHeaders(Struct... headers) { - //_currentMsg add the headers + for(Struct struct: headers) + { + if(struct instanceof DeliveryProperties) + { + _currentDeliveryProps = (DeliveryProperties)struct; + } + else if (struct instanceof MessageProperties) + { + _currentMessageProps = (MessageProperties)struct; + } + } } - + public void messageReceived() { - _adaptee.onMessage(_currentMsg); + _adaptee.onMessage(_currentMsg); } } Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java?view=diff&rev=564124&r1=564123&r2=564124 ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java (original) +++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java Thu Aug 9 00:24:02 2007 @@ -34,7 +34,7 @@ { if (exception instanceof QpidException) { - jmsException = new JMSException(exception.getMessage(), ((QpidException) exception).getErrorCode()); + jmsException = new JMSException(exception.getMessage(), String.valueOf(((QpidException) exception).getErrorCode())); } else { @@ -51,7 +51,7 @@ static public XAException convertQpidExceptionToXAException(QpidException exception) { - String qpidErrorCode = exception.getErrorCode(); + String qpidErrorCode = String.valueOf(exception.getErrorCode()); // todo map this error to an XA code int xaCode = XAException.XAER_PROTO; return new XAException(xaCode); Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java?view=diff&rev=564124&r1=564123&r2=564124 ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java (original) +++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java Thu Aug 9 00:24:02 2007 @@ -131,9 +131,9 @@ // this is a queue we expect that this queue exists getSession().getQpidSession() .messageSubscribe(destination.getName(), getMessageActorID(), - org.apache.qpidity.client.Session.CONFIRM_MODE_NOT_REQUIRED, + org.apache.qpidity.client.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED, // When the message selctor is set we do not acquire the messages - _messageSelector != null ? org.apache.qpidity.client.Session.ACQUIRE_MODE_NO_ACQUIRE : org.apache.qpidity.client.Session.ACQUIRE_MODE_PRE_ACQUIRE, + _messageSelector != null ? org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE : org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE, messageAssembler, null, _noLocal ? Option.NO_LOCAL : Option.NO_OPTION); if (_messageSelector != null) { @@ -167,9 +167,9 @@ // subscribe to this topic getSession().getQpidSession() .messageSubscribe(queueName, getMessageActorID(), - org.apache.qpidity.client.Session.CONFIRM_MODE_NOT_REQUIRED, + org.apache.qpidity.client.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED, // We always acquire the messages - org.apache.qpidity.client.Session.ACQUIRE_MODE_PRE_ACQUIRE, messageAssembler, null, + org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE, messageAssembler, null, _noLocal ? Option.NO_LOCAL : Option.NO_OPTION, // Request exclusive subscription access, meaning only this subscription // can access the queue. @@ -591,7 +591,7 @@ // TODO: messageID is a string but range need a long??? // ranges.add(message.getMessageID()); - getSession().getQpidSession().messageAcquire(ranges); + getSession().getQpidSession().messageAcquire(ranges, org.apache.qpidity.client.Session.ACQUIRE_ANY_AVAILABLE_MESSAGE); RangeSet acquired = getSession().getQpidSession().getAccquiredMessages(); if (acquired.size() > 0) { Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java?view=diff&rev=564124&r1=564123&r2=564124 ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java (original) +++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java Thu Aug 9 00:24:02 2007 @@ -94,9 +94,9 @@ // this is a queue we expect that this queue exists getSession().getQpidSession() .messageSubscribe(queue.getQueueName(), getMessageActorID(), - org.apache.qpidity.client.Session.CONFIRM_MODE_NOT_REQUIRED, + org.apache.qpidity.client.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED, // We do not acquire those messages - org.apache.qpidity.client.Session.ACQUIRE_MODE_NO_ACQUIRE, messageAssembler, null); + org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE, messageAssembler, null); } Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java?view=diff&rev=564124&r1=564123&r2=564124 ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java (original) +++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java Thu Aug 9 00:24:02 2007 @@ -384,7 +384,7 @@ _incomingAsynchronousMessages.notifyAll(); } // close the underlaying QpidSession - _qpidSession.close(); + _qpidSession.sessionClose(); } } Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Channel.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Channel.java?view=diff&rev=564124&r1=564123&r2=564124 ============================================================================== --- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Channel.java (original) +++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Channel.java Thu Aug 9 00:24:02 2007 @@ -35,30 +35,31 @@ * @author Rafael H. Schloming */ -class Channel extends Invoker implements Handler +public class Channel extends Invoker implements Handler { final private Connection connection; final private int channel; final private TrackSwitch tracks; final private Delegate delegate; - + final private SessionDelegate sessionDelegate; // session may be null private Session session; private Method method = null; private List data = null; private int dataSize; - + public Channel(Connection connection, int channel, SessionDelegate delegate) { this.connection = connection; this.channel = channel; this.delegate = new ChannelDelegate(); - + this.sessionDelegate = delegate; + tracks = new TrackSwitch(); tracks.map(L1, new MethodHandler - (getMajor(), getMinor(), this.delegate)); + (getMajor(), getMinor(), connection.getConnectionDelegate())); tracks.map(L2, new MethodHandler (getMajor(), getMinor(), this.delegate)); tracks.map(L3, new SessionResolver Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Connection.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Connection.java?view=diff&rev=564124&r1=564123&r2=564124 ============================================================================== --- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Connection.java (original) +++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Connection.java Thu Aug 9 00:24:02 2007 @@ -36,7 +36,8 @@ * short instead of Short */ -class Connection implements ProtocolActions +// RA making this public until we sort out the package issues +public class Connection implements ProtocolActions { final private Handler input; @@ -58,6 +59,11 @@ this.delegate = delegate; } + public ConnectionDelegate getConnectionDelegate() + { + return delegate; + } + public Connection(Handler output, ConnectionDelegate delegate) { @@ -103,6 +109,9 @@ output.handle(header.toByteBuffer()); // XXX: how do we close the connection? } + + // not sure if this is the right place + getChannel(0).connectionStart(header.getMajor(), header.getMinor(), null, "PLAIN", "utf8"); } public Channel getChannel(int number) Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java?view=diff&rev=564124&r1=564123&r2=564124 ============================================================================== --- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java (original) +++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java Thu Aug 9 00:24:02 2007 @@ -20,6 +20,17 @@ */ package org.apache.qpidity; +import java.io.UnsupportedEncodingException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; + +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; +import javax.security.sasl.SaslServer; + /** * ConnectionDelegate @@ -27,9 +38,240 @@ * @author Rafael H. Schloming */ -public interface ConnectionDelegate +/** + * Currently only implemented client specific methods + * the server specific methods are dummy impls for testing + * + * the connectionClose is kind of different for both sides + */ +public abstract class ConnectionDelegate extends Delegate { + private String _username; + private String _password; + private String _mechanism; + private String _virtualHost; + private SaslClient saslClient; + private SaslServer saslServer; + private String _locale = "utf8"; + private int maxFrame = 64*1024; + private Condition _negotiationComplete; + private Lock _negotiationCompleteLock; + + public abstract SessionDelegate getSessionDelegate(); + + public void setCondition(Lock negotiationCompleteLock,Condition negotiationComplete) + { + _negotiationComplete = negotiationComplete; + _negotiationCompleteLock = negotiationCompleteLock; + } + + // ---------------------------------------------- + // Client side + //----------------------------------------------- + @Override public void connectionStart(Channel context, ConnectionStart struct) + { + System.out.println("The broker has sent connection-start"); + + String mechanism = null; + String response = null; + try + { + mechanism = SecurityHelper.chooseMechanism(struct.getMechanisms()); + saslClient = Sasl.createSaslClient(new String[]{ mechanism },null, "AMQP", "localhost", null, + SecurityHelper.createCallbackHandler(mechanism,_username,_password )); + response = new String(saslClient.evaluateChallenge(new byte[0]),_locale); + } + catch (UnsupportedEncodingException e) + { + // need error handling + } + catch (SaslException e) + { + // need error handling + } + catch (QpidException e) + { + // need error handling + } + + Map props = new HashMap(); + context.connectionStartOk(props, mechanism, response, _locale); + } + + @Override public void connectionSecure(Channel context, ConnectionSecure struct) + { + System.out.println("The broker has sent connection-secure with chanllenge " + struct.getChallenge()); + + try + { + String response = new String(saslClient.evaluateChallenge(struct.getChallenge().getBytes()),_locale); + context.connectionSecureOk(response); + } + catch (UnsupportedEncodingException e) + { + // need error handling + } + catch (SaslException e) + { + // need error handling + } + } + + @Override public void connectionTune(Channel context, ConnectionTune struct) + { + System.out.println("The broker has sent connection-tune " + struct.toString()); + + // should update the channel max given by the broker. + context.connectionTuneOk(struct.getChannelMax(), struct.getFrameMax(), struct.getHeartbeat()); + context.connectionOpen(_virtualHost, null, Option.INSIST); + } + + + @Override public void connectionOpenOk(Channel context, ConnectionOpenOk struct) + { + String knownHosts = struct.getKnownHosts(); + System.out.println("The broker has opened the connection for use"); + System.out.println("The broker supplied the following hosts for failover " + knownHosts); + _negotiationCompleteLock.lock(); + try + { + _negotiationComplete.signalAll(); + } + finally + { + _negotiationCompleteLock.unlock(); + } + } + + public void connectionRedirect(Channel context, ConnectionRedirect struct) + { + // not going to bother at the moment + } + + // ---------------------------------------------- + // Server side + //----------------------------------------------- + @Override public void connectionStartOk(Channel context, ConnectionStartOk struct) + { + //set the client side locale on the server side + _locale = struct.getLocale(); + _mechanism = struct.getMechanism(); + + System.out.println("The client has sent connection-start-ok"); + + //try + //{ + //saslServer = Sasl.createSaslServer(_mechanism, "AMQP", "ABC",null,SecurityHelper.createCallbackHandler(_mechanism,_username,_password)); + //byte[] challenge = saslServer.evaluateResponse(struct.getResponse().getBytes()); + byte[] challenge = null; + if ( challenge == null) + { + System.out.println("Authentication sucessfull"); + context.connectionTune(Integer.MAX_VALUE,maxFrame, 0); + } + else + { + System.out.println("Authentication failed"); + try + { + context.connectionSecure(new String(challenge,_locale)); + } + catch(Exception e) + { + + } + } + + + /*} + catch (SaslException e) + { + // need error handling + } + catch (QpidException e) + { + // need error handling + }*/ + } + + @Override public void connectionTuneOk(Channel context, ConnectionTuneOk struct) + { + System.out.println("The client has excepted the tune params"); + } + + @Override public void connectionSecureOk(Channel context, ConnectionSecureOk struct) + { + System.out.println("The client has sent connection-secure-ok"); + try + { + saslServer = Sasl.createSaslServer(_mechanism, "AMQP", "ABC",new HashMap(),SecurityHelper.createCallbackHandler(_mechanism,_username,_password)); + byte[] challenge = saslServer.evaluateResponse(struct.getResponse().getBytes()); + if ( challenge == null) + { + System.out.println("Authentication sucessfull"); + context.connectionTune(Integer.MAX_VALUE,maxFrame, 0); + } + else + { + System.out.println("Authentication failed"); + try + { + context.connectionSecure(new String(challenge,_locale)); + } + catch(Exception e) + { + + } + } + + + } + catch (SaslException e) + { + // need error handling + } + catch (QpidException e) + { + // need error handling + } + } + + + @Override public void connectionOpen(Channel context, ConnectionOpen struct) + { + String hosts = "amqp:1223243232325"; + System.out.println("The client has sent connection-open-ok"); + context.connectionOpenOk(hosts); + } + + + public String getPassword() + { + return _password; + } + + public void setPassword(String password) + { + _password = password; + } + + public String getUsername() + { + return _username; + } + + public void setUsername(String username) + { + _username = username; + } - SessionDelegate getSessionDelegate(); + public String getVirtualHost() + { + return _virtualHost; + } + public void setVirtualHost(String host) + { + _virtualHost = host; + } } Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Frame.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Frame.java?view=diff&rev=564124&r1=564123&r2=564124 ============================================================================== --- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Frame.java (original) +++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Frame.java Thu Aug 9 00:24:02 2007 @@ -35,7 +35,8 @@ * @author Rafael H. Schloming */ -class Frame implements Iterable +// RA: changed it to public until we sort the package issues +public class Frame implements Iterable { public static final int HEADER_SIZE = 12; Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/MinaHandler.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/MinaHandler.java?view=diff&rev=564124&r1=564123&r2=564124 ============================================================================== --- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/MinaHandler.java (original) +++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/MinaHandler.java Thu Aug 9 00:24:02 2007 @@ -42,8 +42,8 @@ * * @author Rafael H. Schloming */ - -class MinaHandler implements IoHandler +//RA making this public until we sort out the package issues +public class MinaHandler implements IoHandler { private final ConnectionDelegate delegate; @@ -124,7 +124,8 @@ { IoAcceptor acceptor = new SocketAcceptor(); acceptor.bind(new InetSocketAddress(host, port), - new MinaHandler(delegate, InputHandler.State.PROTO_HDR)); + new MinaHandler(delegate, InputHandler.State.PROTO_HDR)); + } public static final Connection connect(String host, int port, Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java?view=diff&rev=564124&r1=564123&r2=564124 ============================================================================== --- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java (original) +++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java Thu Aug 9 00:24:02 2007 @@ -29,7 +29,9 @@ * @author Rafael H. Schloming */ -class ProtocolHeader +//RA making this public until we sort out the package issues + +public class ProtocolHeader { private static final byte[] AMQP = {'A', 'M', 'Q', 'P' }; Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/QpidConfig.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/QpidConfig.java?view=auto&rev=564124 ============================================================================== --- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/QpidConfig.java (added) +++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/QpidConfig.java Thu Aug 9 00:24:02 2007 @@ -0,0 +1,90 @@ +package org.apache.qpidity; + +/** + * API to configure the Security parameters of the client. + * The user can choose to pick the config from any source + * and set it using this class. + * + */ +public class QpidConfig +{ + private static QpidConfig _instance = new QpidConfig(); + + private SecurityMechanism[] securityMechanisms = + new SecurityMechanism[]{new SecurityMechanism("PLAIN","org.apache.qpidity.security.UsernamePasswordCallbackHandler"), + new SecurityMechanism("CRAM_MD5","org.apache.qpidity.security.UsernamePasswordCallbackHandler")}; + + private SaslClientFactory[] saslClientFactories = + new SaslClientFactory[]{new SaslClientFactory("AMQPLAIN","org.apache.qpidity.security.amqplain.AmqPlainSaslClientFactory")}; + + private QpidConfig(){} + + public static QpidConfig get() + { + return _instance; + } + + public void setSecurityMechanisms(SecurityMechanism... securityMechanisms) + { + this.securityMechanisms = securityMechanisms; + } + + public SecurityMechanism[] getSecurityMechanisms() + { + return securityMechanisms; + } + + public void setSaslClientFactories(SaslClientFactory... saslClientFactories) + { + this.saslClientFactories = saslClientFactories; + } + + public SaslClientFactory[] getSaslClientFactories() + { + return saslClientFactories; + } + + public class SecurityMechanism + { + String type; + String handler; + + SecurityMechanism(String type,String handler) + { + this.type = type; + this.handler = handler; + } + + public String getHandler() + { + return handler; + } + + public String getType() + { + return type; + } + } + + public class SaslClientFactory + { + String type; + String factoryClass; + + SaslClientFactory(String type,String factoryClass) + { + this.type = type; + this.factoryClass = factoryClass; + } + + public String getFactoryClass() + { + return factoryClass; + } + + public String getType() + { + return type; + } + } +} Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/QpidException.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/QpidException.java?view=diff&rev=564124&r1=564123&r2=564124 ============================================================================== --- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/QpidException.java (original) +++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/QpidException.java Thu Aug 9 00:24:02 2007 @@ -25,12 +25,9 @@ public class QpidException extends Exception { /** - * This exception error code. - *

This error code is used for internationalisation purpose. - *

This error code is set from the AMQP ones. - * So we may want to use the AMQP error code directly. + * AMQP error code */ - private String _errorCode; + private int _errorCode; /** * Constructor for a Qpid Exception. @@ -38,20 +35,27 @@ * they are unknown. * @param message A description of the reason of this exception . * @param errorCode A string specifyin the error code of this exception. - * @param cause The linked Execption. + * @param cause The linked Execption. * + * */ - public QpidException(String message, String errorCode, Throwable cause) + public QpidException(String message, int errorCode, Throwable cause) { super(message, cause); _errorCode = errorCode; } + + //hack to get rid of a compile error from a generated class + public QpidException(String message, String errorCode, Throwable cause) + { + + } /** * Get this execption error code. * * @return This exception error code. */ - public String getErrorCode() + public int getErrorCode() { return _errorCode; } Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SecurityHelper.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SecurityHelper.java?view=auto&rev=564124 ============================================================================== --- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SecurityHelper.java (added) +++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SecurityHelper.java Thu Aug 9 00:24:02 2007 @@ -0,0 +1,71 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity; + +import java.io.UnsupportedEncodingException; +import java.util.HashSet; +import java.util.StringTokenizer; + +import org.apache.qpidity.security.AMQPCallbackHandler; +import org.apache.qpidity.security.CallbackHandlerRegistry; + +public class SecurityHelper +{ + public static String chooseMechanism(String mechanisms) throws UnsupportedEncodingException + { + StringTokenizer tokenizer = new StringTokenizer(mechanisms, " "); + HashSet mechanismSet = new HashSet(); + while (tokenizer.hasMoreTokens()) + { + mechanismSet.add(tokenizer.nextToken()); + } + + String preferredMechanisms = CallbackHandlerRegistry.getInstance().getMechanisms(); + StringTokenizer prefTokenizer = new StringTokenizer(preferredMechanisms, " "); + while (prefTokenizer.hasMoreTokens()) + { + String mech = prefTokenizer.nextToken(); + if (mechanismSet.contains(mech)) + { + return mech; + } + } + return null; + } + + public static AMQPCallbackHandler createCallbackHandler(String mechanism, String username,String password) + throws QpidException + { + Class mechanismClass = CallbackHandlerRegistry.getInstance().getCallbackHandlerClass(mechanism); + try + { + Object instance = mechanismClass.newInstance(); + AMQPCallbackHandler cbh = (AMQPCallbackHandler) instance; + cbh.initialise(username,password); + return cbh; + } + catch (Exception e) + { + throw new QpidException("Unable to create callback handler: " + e,0, e.getCause()); + } + } + +} Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Session.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Session.java?view=diff&rev=564124&r1=564123&r2=564124 ============================================================================== --- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Session.java (original) +++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Session.java Thu Aug 9 00:24:02 2007 @@ -42,11 +42,12 @@ // completed incoming commands private final RangeSet processed = new RangeSet(); private Range syncPoint = null; - + // outgoing command count private long commandsOut = 0; private Map commands = new HashMap(); private long mark = 0; + public Map getOutstandingCommands() { @@ -231,7 +232,6 @@ } future.set(result); } - protected Future invoke(Method m, Class klass) { long command = commandsOut; Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java?view=diff&rev=564124&r1=564123&r2=564124 ============================================================================== --- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java (original) +++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java Thu Aug 9 00:24:02 2007 @@ -190,13 +190,20 @@ { final Map> queues = new HashMap>(); - MinaHandler.accept("0.0.0.0", 5672, new ConnectionDelegate() - { - public SessionDelegate getSessionDelegate() - { - return new ToyBroker(queues); - } - }); + + ConnectionDelegate delegate = new ConnectionDelegate() + { + public SessionDelegate getSessionDelegate() + { + return new ToyBroker(queues); + } + }; + + //hack + delegate.setUsername("guest"); + delegate.setPassword("guest"); + + MinaHandler.accept("0.0.0.0", 5672, delegate); } } Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/api/Message.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/api/Message.java?view=diff&rev=564124&r1=564123&r2=564124 ============================================================================== --- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/api/Message.java (original) +++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/api/Message.java Thu Aug 9 00:24:02 2007 @@ -1,5 +1,7 @@ package org.apache.qpidity.api; +import java.nio.ByteBuffer; + import org.apache.qpidity.MessageProperties; import org.apache.qpidity.DeliveryProperties; @@ -43,6 +45,8 @@ */ public void appendData(byte[] src); + public void appendData(ByteBuffer src); + /** * This will abstract the underlying message data. * The Message implementation may not hold all message Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/filter/PropertyExpression.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/filter/PropertyExpression.java?view=diff&rev=564124&r1=564123&r2=564124 ============================================================================== --- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/filter/PropertyExpression.java (original) +++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/filter/PropertyExpression.java Thu Aug 9 00:24:02 2007 @@ -56,7 +56,7 @@ } catch (Exception e) { - throw new QpidException("cannot evaluate property ", "message selector", e); + throw new QpidException("cannot evaluate property ", 0, e); } } return result; Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/security/AMQPCallbackHandler.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/security/AMQPCallbackHandler.java?view=auto&rev=564124 ============================================================================== --- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/security/AMQPCallbackHandler.java (added) +++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/security/AMQPCallbackHandler.java Thu Aug 9 00:24:02 2007 @@ -0,0 +1,28 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.security; + +import javax.security.auth.callback.CallbackHandler; + +public interface AMQPCallbackHandler extends CallbackHandler +{ + void initialise(String username,String password); +} Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/security/CallbackHandlerRegistry.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/security/CallbackHandlerRegistry.java?view=auto&rev=564124 ============================================================================== --- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/security/CallbackHandlerRegistry.java (added) +++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/security/CallbackHandlerRegistry.java Thu Aug 9 00:24:02 2007 @@ -0,0 +1,92 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.security; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.qpidity.QpidConfig; + +public class CallbackHandlerRegistry +{ + + private static CallbackHandlerRegistry _instance = new CallbackHandlerRegistry(); + + private Map _mechanismToHandlerClassMap = new HashMap(); + + private StringBuilder _mechanisms; + + public static CallbackHandlerRegistry getInstance() + { + return _instance; + } + + public Class getCallbackHandlerClass(String mechanism) + { + return _mechanismToHandlerClassMap.get(mechanism); + } + + public String getMechanisms() + { + return _mechanisms.toString(); + } + + private CallbackHandlerRegistry() + { + // first we register any Sasl client factories + DynamicSaslRegistrar.registerSaslProviders(); + registerMechanisms(); + } + + private void registerMechanisms() + { + for (QpidConfig.SecurityMechanism securityMechanism: QpidConfig.get().getSecurityMechanisms() ) + { + Class clazz = null; + try + { + clazz = Class.forName(securityMechanism.getHandler()); + if (!AMQPCallbackHandler.class.isAssignableFrom(clazz)) + { + System.out.println("SASL provider " + clazz + " does not implement " + AMQPCallbackHandler.class + + ". Skipping"); + continue; + } + _mechanismToHandlerClassMap.put(securityMechanism.getType(), clazz); + if (_mechanisms == null) + { + + _mechanisms = new StringBuilder(); + _mechanisms.append(securityMechanism.getType()); + } + else + { + _mechanisms.append(" " + securityMechanism.getType()); + } + } + catch (ClassNotFoundException ex) + { + System.out.println("Unable to load class " + securityMechanism.getHandler() + ". Skipping that SASL provider"); + continue; + } + } + } +} Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/security/DynamicSaslRegistrar.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/security/DynamicSaslRegistrar.java?view=auto&rev=564124 ============================================================================== --- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/security/DynamicSaslRegistrar.java (added) +++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/security/DynamicSaslRegistrar.java Thu Aug 9 00:24:02 2007 @@ -0,0 +1,70 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.security; + +import java.security.Security; +import java.util.Map; +import java.util.TreeMap; + +import javax.security.sasl.SaslClientFactory; + +import org.apache.qpidity.QpidConfig; + +public class DynamicSaslRegistrar +{ + public static void registerSaslProviders() + { + Map factories = registerSaslClientFactories(); + if (factories.size() > 0) + { + Security.addProvider(new JCAProvider(factories)); + System.out.println("Dynamic SASL provider added as a security provider"); + } + } + + private static Map registerSaslClientFactories() + { + TreeMap factoriesToRegister = + new TreeMap(); + + for (QpidConfig.SaslClientFactory factory: QpidConfig.get().getSaslClientFactories()) + { + String className = factory.getFactoryClass(); + try + { + Class clazz = Class.forName(className); + if (!(SaslClientFactory.class.isAssignableFrom(clazz))) + { + System.out.println("Class " + clazz + " does not implement " + SaslClientFactory.class + " - skipping"); + continue; + } + factoriesToRegister.put(factory.getType(), clazz); + } + catch (Exception ex) + { + System.out.println("Error instantiating SaslClientFactory calss " + className + " - skipping"); + } + } + return factoriesToRegister; + } + + +} Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/security/JCAProvider.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/security/JCAProvider.java?view=auto&rev=564124 ============================================================================== --- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/security/JCAProvider.java (added) +++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/security/JCAProvider.java Thu Aug 9 00:24:02 2007 @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.security; + +import java.security.Provider; +import java.security.Security; +import java.util.Map; + +public class JCAProvider extends Provider +{ + public JCAProvider(Map providerMap) + { + super("AMQSASLProvider", 1.0, "A JCA provider that registers all " + + "AMQ SASL providers that want to be registered"); + register(providerMap); + Security.addProvider(this); + } + + private void register(Map providerMap) + { + for (Map.Entry me :providerMap.entrySet()) + { + put("SaslClientFactory." + me.getKey(), me.getValue().getName()); + } + } +} \ No newline at end of file Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/security/UsernamePasswordCallbackHandler.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/security/UsernamePasswordCallbackHandler.java?view=auto&rev=564124 ============================================================================== --- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/security/UsernamePasswordCallbackHandler.java (added) +++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/security/UsernamePasswordCallbackHandler.java Thu Aug 9 00:24:02 2007 @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.security; + +import java.io.IOException; + +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.NameCallback; +import javax.security.auth.callback.PasswordCallback; +import javax.security.auth.callback.UnsupportedCallbackException; + +public class UsernamePasswordCallbackHandler implements AMQPCallbackHandler +{ + private String _username; + private String _password; + + public void initialise(String username,String password) + { + _username = username; + _password = password; + } + + public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException + { + for (int i = 0; i < callbacks.length; i++) + { + Callback cb = callbacks[i]; + if (cb instanceof NameCallback) + { + ((NameCallback)cb).setName(_username); + } + else if (cb instanceof PasswordCallback) + { + ((PasswordCallback)cb).setPassword((_password).toCharArray()); + } + else + { + throw new UnsupportedCallbackException(cb); + } + } + } +} Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/security/amqplain/AmqPlainSaslClient.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/security/amqplain/AmqPlainSaslClient.java?view=auto&rev=564124 ============================================================================== --- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/security/amqplain/AmqPlainSaslClient.java (added) +++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/security/amqplain/AmqPlainSaslClient.java Thu Aug 9 00:24:02 2007 @@ -0,0 +1,105 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.security.amqplain; + +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.FieldTableFactory; + +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; +import javax.security.auth.callback.CallbackHandler; +import javax.security.auth.callback.NameCallback; +import javax.security.auth.callback.PasswordCallback; +import javax.security.auth.callback.Callback; + +/** + * Implements the "AMQPlain" authentication protocol that uses FieldTables to send username and pwd. + * + */ +public class AmqPlainSaslClient implements SaslClient +{ + /** + * The name of this mechanism + */ + public static final String MECHANISM = "AMQPLAIN"; + + private CallbackHandler _cbh; + + public AmqPlainSaslClient(CallbackHandler cbh) + { + _cbh = cbh; + } + + public String getMechanismName() + { + return "AMQPLAIN"; + } + + public boolean hasInitialResponse() + { + return true; + } + + public byte[] evaluateChallenge(byte[] challenge) throws SaslException + { + // we do not care about the prompt or the default name + NameCallback nameCallback = new NameCallback("prompt", "defaultName"); + PasswordCallback pwdCallback = new PasswordCallback("prompt", false); + Callback[] callbacks = new Callback[]{nameCallback, pwdCallback}; + try + { + _cbh.handle(callbacks); + } + catch (Exception e) + { + throw new SaslException("Error handling SASL callbacks: " + e, e); + } + FieldTable table = FieldTableFactory.newFieldTable(); + table.setString("LOGIN", nameCallback.getName()); + table.setString("PASSWORD", new String(pwdCallback.getPassword())); + return table.getDataAsBytes(); + } + + public boolean isComplete() + { + return true; + } + + public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException + { + throw new SaslException("Not supported"); + } + + public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException + { + throw new SaslException("Not supported"); + } + + public Object getNegotiatedProperty(String propName) + { + return null; + } + + public void dispose() throws SaslException + { + _cbh = null; + } +}