qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From arnaudsi...@apache.org
Subject svn commit: r563197 - in /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity: ./ jms/
Date Mon, 06 Aug 2007 17:15:57 GMT
Author: arnaudsimon
Date: Mon Aug  6 10:15:56 2007
New Revision: 563197

URL: http://svn.apache.org/viewvc?view=rev&rev=563197
Log:
Added dtx classes

Added:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java
  (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java
  (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XASessionImpl.java
  (with props)
Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/DtxSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/DtxSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/DtxSession.java?view=diff&rev=563197&r1=563196&r2=563197
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/DtxSession.java
(original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/DtxSession.java
Mon Aug  6 10:15:56 2007
@@ -18,7 +18,7 @@
  */
 package org.apache.qpidity;
 
-import org.apache.qpidity.QpidException;
+import javax.transaction.xa.Xid;
 
 /**
  * This session�s resources are control under the scope of a distributed transaction.
@@ -27,11 +27,112 @@
 {
 
     /**
-     * Get the XA resource associated with this session.
+     * This method is called when messages should be produced and consumed on behalf a transaction
+     * branch identified by xid.
+     * possible options are:
+     * <ul>
+     * <li> {@link Option#JOIN}:  Indicate that the start applies to joining a transaction
previously seen.
+     * <li> {@link Option#RESUME}: Indicate that the start applies to resuming a suspended
transaction branch specified.
+     * </ul>
      *
-     * @return this session XA resource.
-     * @throws QpidException If the session fails to retrieve its associated XA resource
-     *                       due to some error.
+     * @param xid     Specifies the xid of the transaction branch to be started.
+     * @param options Possible options are: {@link Option#JOIN} and {@link Option#RESUME}.
+     * @throws QpidException If the session fails to start due to some error
      */
-    public javax.transaction.xa.XAResource getDTXResource() throws QpidException;
+    public void dtxDemarcationStart(Xid xid, Option... options) throws QpidException;
+
+    /**
+     * This method is called when the work done on behalf a transaction branch finishes or
needs to
+     * be suspended.
+     * possible options are:
+     * <ul>
+     * <li> {@link Option#FAIL}: indicates that this portion of work has failed;
+     * otherwise this portion of work has
+     * completed successfully.
+     * <li> {@link Option#SUSPEND}: Indicates that the transaction branch is
+     * temporarily suspended in an incomplete state.
+     * </ul>
+     *
+     * @param xid     Specifies the xid of the transaction branch to be ended.
+     * @param options Available options are: {@link Option#FAIL} and {@link Option#SUSPEND}.
+     * @throws QpidException If the session fails to end due to some error
+     */
+    public void dtxDemarcationEnd(Xid xid, Option... options) throws QpidException;
+
+    /**
+     * Commit the work done on behalf a transaction branch. This method commits the work
associated
+     * with xid. Any produced messages are made available and any consumed messages are discarded.
+     * possible option is:
+     * <ul>
+     * <li> {@link Option#ONE_PHASE}: When set then one-phase commit optimization is
used.
+     * </ul>
+     *
+     * @param xid     Specifies the xid of the transaction branch to be committed.
+     * @param options Available option is: {@link Option#ONE_PHASE}
+     * @throws QpidException If the session fails to commit due to some error
+     */
+    public void dtxCoordinationCommit(Xid xid, Option... options) throws QpidException;
+
+    /**
+     * This method is called to forget about a heuristically completed transaction branch.
+     *
+     * @param xid Specifies the xid of the transaction branch to be forgotten.
+     * @throws QpidException If the session fails to forget due to some error
+     */
+    public void dtxCoordinationForget(Xid xid) throws QpidException;
+
+    /**
+     * This method obtains the current transaction timeout value in seconds. If set-timeout
was not
+     * used prior to invoking this method, the return value is the default timeout; otherwise,
the
+     * value used in the previous set-timeout call is returned.
+     *
+     * @param xid Specifies the xid of the transaction branch for getting the timeout.
+     * @return The current transaction timeout value in seconds.
+     * @throws QpidException If the session fails to get the timeout due to some error
+     */
+    public long dtxCoordinationGetTimeout(Xid xid) throws QpidException;
+
+    /**
+     * This method prepares for commitment any message produced or consumed on behalf of
xid.
+     *
+     * @param xid Specifies the xid of the transaction branch that can be prepared.
+     * @return The status of the prepare operation: can be one of those:
+     *         xa-ok: Normal execution.
+     *         <p/>
+     *         xa-rdonly: The transaction branch was read-only and has been committed.
+     *         <p/>
+     *         xa-rbrollback: The broker marked the transaction branch rollback-only for
an unspecified
+     *         reason.
+     *         <p/>
+     *         xa-rbtimeout: The work represented by this transaction branch took too long.
+     * @throws QpidException If the session fails to prepare due to some error
+     */
+    public short dtxCoordinationPrepare(Xid xid) throws QpidException;
+
+    /**
+     * This method is called to obtain a list of transaction branches that are in a prepared
or
+     * heuristically completed state.
+     *
+     * @return a array of xids to be recovered.
+     * @throws QpidException If the session fails to recover due to some error
+     */
+    public Xid[] dtxCoordinationRecover() throws QpidException;
+
+    /**
+     * This method rolls back the work associated with xid. Any produced messages are discarded
and
+     * any consumed messages are re-enqueued.
+     *
+     * @param xid Specifies the xid of the transaction branch that can be rolled back.
+     * @throws QpidException If the session fails to rollback due to some error
+     */
+    public void dtxCoordinationRollback(Xid xid) throws QpidException;
+
+    /**
+     * Sets the specified transaction branch timeout value in seconds.
+     *
+     * @param xid     Specifies the xid of the transaction branch for setting the timeout.
+     * @param timeout The transaction timeout value in seconds.
+     * @throws QpidException If the session fails to set the timeout due to some error
+     */
+    public void dtxCoordinationSetTimeout(Xid xid, long timeout) throws QpidException;
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java?view=diff&rev=563197&r1=563196&r2=563197
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java
(original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java
Mon Aug  6 10:15:56 2007
@@ -42,7 +42,7 @@
     /**
      * Maps from session id (Integer) to SessionImpl instance
      */
-    private final Vector<SessionImpl> _sessions = new Vector<SessionImpl>();
+    protected final Vector<SessionImpl> _sessions = new Vector<SessionImpl>();
 
     /**
      * This is the clientID
@@ -113,10 +113,18 @@
      * @return A newly created session
      * @throws JMSException If the Connection object fails to create a session due to some
internal error.
      */
-    public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException
+    public synchronized Session createSession(boolean transacted, int acknowledgeMode) throws
JMSException
     {
         checkNotClosed();
-        SessionImpl session = new SessionImpl(this, transacted, acknowledgeMode);
+        SessionImpl session = null;
+        try
+        {
+            session = new SessionImpl(this, transacted, acknowledgeMode, false);
+        }
+        catch (QpidException e)
+        {
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
         // add this session with the list of session that are handled by this connection
         _sessions.add(session);
         return session;
@@ -178,7 +186,7 @@
      * @return the <CODE>ExceptionListener</CODE> for this connection
      * @throws JMSException In case of unforeseen problem
      */
-    public ExceptionListener getExceptionListener() throws JMSException
+    public synchronized ExceptionListener getExceptionListener() throws JMSException
     {
         checkNotClosed();
         return _exceptionListener;
@@ -203,7 +211,7 @@
      * @param exceptionListener The connection listener.
      * @throws JMSException If the connection is closed.
      */
-    public void setExceptionListener(ExceptionListener exceptionListener) throws JMSException
+    public synchronized void setExceptionListener(ExceptionListener exceptionListener) throws
JMSException
     {
         checkNotClosed();
         _exceptionListener = exceptionListener;
@@ -217,7 +225,7 @@
      *
      * @throws JMSException In case of a problem due to some internal error.
      */
-    public void start() throws JMSException
+    public synchronized void start() throws JMSException
     {
         checkNotClosed();
         if (!_started)
@@ -231,7 +239,7 @@
                 }
                 catch (Exception e)
                 {
-                   throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+                    throw ExceptionHelper.convertQpidExceptionToJMSException(e);
                 }
             }
             _started = true;
@@ -248,7 +256,7 @@
      *
      * @throws JMSException In case of a problem due to some internal error.
      */
-    public void stop() throws JMSException
+    public synchronized void stop() throws JMSException
     {
         checkNotClosed();
         if (_started)
@@ -262,7 +270,7 @@
                 }
                 catch (Exception e)
                 {
-                   throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+                    throw ExceptionHelper.convertQpidExceptionToJMSException(e);
                 }
             }
             _started = false;
@@ -284,7 +292,7 @@
      *
      * @throws JMSException In case of a problem due to some internal error.
      */
-    public void close() throws JMSException
+    public synchronized void close() throws JMSException
     {
         checkNotClosed();
         if (!_isClosed)
@@ -320,8 +328,8 @@
      * @throws JMSException In case of a problem due to some internal error.
      */
     public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector,
-                                                       ServerSessionPool sessionPool, int
maxMessages) throws
-                                                                                        
              JMSException
+                                                       ServerSessionPool sessionPool, int
maxMessages)
+            throws JMSException
     {
         checkNotClosed();
         return null;
@@ -359,7 +367,7 @@
      * @return A queueSession object/
      * @throws JMSException If creating a QueueSession fails due to some internal error.
      */
-    public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws
JMSException
+    public synchronized QueueSession createQueueSession(boolean transacted, int acknowledgeMode)
throws JMSException
     {
         checkNotClosed();
         QueueSessionImpl queueSession = new QueueSessionImpl(this, transacted, acknowledgeMode);
@@ -380,8 +388,8 @@
      * @throws JMSException In case of a problem due to some internal error.
      */
     public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector,
-                                                       ServerSessionPool sessionPool, int
maxMessages) throws
-                                                                                        
              JMSException
+                                                       ServerSessionPool sessionPool, int
maxMessages)
+            throws JMSException
     {
         return createConnectionConsumer((Destination) queue, messageSelector, sessionPool,
maxMessages);
     }
@@ -396,7 +404,7 @@
      * @return a newly created topic session
      * @throws JMSException If creating the session fails due to some internal error.
      */
-    public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws
JMSException
+    public synchronized TopicSession createTopicSession(boolean transacted, int acknowledgeMode)
throws JMSException
     {
         checkNotClosed();
         TopicSessionImpl session = new TopicSessionImpl(this, transacted, acknowledgeMode);
@@ -418,8 +426,8 @@
      * @throws JMSException In case of a problem due to some internal error.
      */
     public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector,
-                                                       ServerSessionPool sessionPool, int
maxMessages) throws
-                                                                                        
              JMSException
+                                                       ServerSessionPool sessionPool, int
maxMessages)
+            throws JMSException
     {
         return createConnectionConsumer((Destination) topic, messageSelector, sessionPool,
maxMessages);
     }
@@ -442,7 +450,8 @@
             {
                 _logger.debug("Connection has been closed. Cannot invoke any further operations.");
             }
-            throw new javax.jms.IllegalStateException("Connection has been closed. Cannot
invoke any further operations.");
+            throw new javax.jms.IllegalStateException(
+                    "Connection has been closed. Cannot invoke any further operations.");
         }
     }
 

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java?view=diff&rev=563197&r1=563196&r2=563197
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java
(original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java
Mon Aug  6 10:15:56 2007
@@ -20,6 +20,7 @@
 import org.apache.qpidity.QpidException;
 
 import javax.jms.JMSException;
+import javax.transaction.xa.XAException;
 
 /**
  * Helper class for handling exceptions
@@ -46,5 +47,13 @@
             jmsException = (JMSException) exception;
         }
         return jmsException;
+    }
+
+    static public XAException convertQpidExceptionToXAException(QpidException exception)
+    {
+        String qpidErrorCode = exception.getErrorCode();
+        // todo map this error to an XA code
+        int xaCode = XAException.XAER_PROTO;
+        return new XAException(xaCode);
     }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java?view=diff&rev=563197&r1=563196&r2=563197
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java
(original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java
Mon Aug  6 10:15:56 2007
@@ -305,10 +305,10 @@
             throw new IllegalArgumentException("Time to live must be non-negative - supplied
value was " + timeToLive);
         }
         // check that the message is not a foreign one
-
+        // todo
         // set the properties
 
-        //
+        // todo
 
         // dispatch it
         // todo getSession().getQpidSession().messageTransfer(((DestinationImpl) destination).getExchangeName(),
message, Option);

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java?view=diff&rev=563197&r1=563196&r2=563197
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
(original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
Mon Aug  6 10:15:56 2007
@@ -128,10 +128,12 @@
      * @param transacted      Indicates if the session transacted.
      * @param acknowledgeMode The session's acknowledgement mode. This value is ignored and
set to
      *                        {@link Session#SESSION_TRANSACTED} if the <code>transacted</code>
parameter is true.
+     * @param isXA            Indicates whether this session is an XA session.
      * @throws JMSSecurityException If the user could not be authenticated.
-     * @throws JMSException         In case of internal error.
+     * @throws QpidException        In case of internal error.
      */
-    protected SessionImpl(ConnectionImpl connection, boolean transacted, int acknowledgeMode)
throws JMSException
+    protected SessionImpl(ConnectionImpl connection, boolean transacted, int acknowledgeMode,
boolean isXA)
+            throws QpidException
     {
         _connection = connection;
         _transacted = transacted;
@@ -141,19 +143,12 @@
             acknowledgeMode = Session.SESSION_TRANSACTED;
         }
         _acknowledgeMode = acknowledgeMode;
-        try
-        {
-            // create the qpid session with an expiry  <= 0 so that the session does not
expire
-            _qpidSession = _connection.getQpidConnection().createSession(0);
-            // set transacted if required
-            if (_transacted)
-            {
-                //_qpidSession.setTransacted();
-            }
-        }
-        catch (QpidException e)
+        // create the qpid session with an expiry  <= 0 so that the session does not expire
+        _qpidSession = _connection.getQpidConnection().createSession(0);
+        // set transacted if required
+        if (_transacted && !isXA)
         {
-            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+            _qpidSession.txSelect();
         }
         // init the message dispatcher.
         initMessageDispatcherThread();
@@ -314,7 +309,6 @@
         // commit the underlying Qpid Session
         try
         {
-            // Note: this operation makes sure that asynch message processing has returned
             _qpidSession.txCommit();
         }
         catch (QpidException e)
@@ -341,7 +335,6 @@
         // rollback the underlying Qpid Session
         try
         {
-            // Note: this operation makes sure that asynch message processing has returned
             _qpidSession.txRollback();
         }
         catch (org.apache.qpidity.QpidException e)
@@ -640,7 +633,7 @@
         }
         catch (QpidException e)
         {
-           throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
         }
         return result;
     }

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java?view=auto&rev=563197
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java
(added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java
Mon Aug  6 10:15:56 2007
@@ -0,0 +1,54 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.jms;
+
+import org.apache.qpidity.QpidException;
+
+import javax.jms.XAConnection;
+import javax.jms.JMSException;
+import javax.jms.XASession;
+
+/**
+ * This class implements the javax.jms.XAConnection interface
+ */
+public class XAConnectionImpl extends ConnectionImpl implements XAConnection
+{
+    /**
+     * Creates an XASession.
+     *
+     * @return A newly created XASession.
+     * @throws JMSException If the XAConnectiono fails to create an XASession due to
+     *                      some internal error.
+     */
+    public synchronized XASession createXASession() throws JMSException
+    {
+        checkNotClosed();
+        XASessionImpl xasession;
+        try
+        {
+            xasession = new XASessionImpl(this);
+        }
+        catch (QpidException e)
+        {
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+        // add this session with the list of session that are handled by this connection
+        _sessions.add(xasession);
+        return xasession;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java?view=auto&rev=563197
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java
(added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java
Mon Aug  6 10:15:56 2007
@@ -0,0 +1,329 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.jms;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.qpidity.Option;
+import org.apache.qpidity.QpidException;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import javax.transaction.xa.XAException;
+
+/**
+ * This is an implementation of javax.jms.XAResource.
+ */
+public class XAResourceImpl implements XAResource
+{
+    /**
+     * this XAResourceImpl's logger
+     */
+    private static final Logger _logger = LoggerFactory.getLogger(XAResourceImpl.class);
+
+    /**
+     * Reference to the associated XASession
+     */
+    private XASessionImpl _xaSession = null;
+
+    /**
+     * The XID of this resource
+     */
+    private Xid _xid;
+
+    //--- constructor
+
+    /**
+     * Create an XAResource associated with a XASession
+     *
+     * @param xaSession The session XAresource
+     */
+    protected XAResourceImpl(XASessionImpl xaSession)
+    {
+        _xaSession = xaSession;
+    }
+
+    //--- The XAResource
+    /**
+     * Commits the global transaction specified by xid.
+     *
+     * @param xid A global transaction identifier
+     * @param b   If true, use a one-phase commit protocol to commit the work done on behalf
of xid.
+     * @throws XAException An error has occurred. Possible XAExceptions are XAER_RMERR, XAER_NOTA
or XAER_PROTO.
+     */
+    public void commit(Xid xid, boolean b) throws XAException
+    {
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("commit ", xid);
+        }
+        try
+        {
+            _xaSession.getQpidSession().dtxCoordinationCommit(xid, b ? Option.ONE_PHASE :
Option.NO_OPTION);
+        }
+        catch (QpidException e)
+        {
+            throw ExceptionHelper.convertQpidExceptionToXAException(e);
+        }
+    }
+
+    /**
+     * Ends the work performed on behalf of a transaction branch.
+     * The resource manager disassociates the XA resource from the transaction branch specified
+     * and lets the transaction complete.
+     * <ul>
+     * <li> If TMSUSPEND is specified in the flags, the transaction branch is temporarily
suspended in an incomplete state.
+     * The transaction context is in a suspended state and must be resumed via the start
method with TMRESUME specified.
+     * <li> If TMFAIL is specified, the portion of work has failed. The resource manager
may mark the transaction as rollback-only
+     * <li> If TMSUCCESS is specified, the portion of work has completed successfully.
+     * /ul>
+     *
+     * @param xid  A global transaction identifier that is the same as the identifier used
previously in the start method
+     * @param flag One of TMSUCCESS, TMFAIL, or TMSUSPEND.
+     * @throws XAException An error has occurred. Possible XAException values
+     *                     are XAER_RMERR, XAER_RMFAILED, XAER_NOTA, XAER_INVAL, XAER_PROTO,
or XA_RB*.
+     */
+    public void end(Xid xid, int flag) throws XAException
+    {
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("end ", xid);
+        }
+        try
+        {
+            _xid = null;
+            _xaSession.getQpidSession()
+                    .dtxDemarcationEnd(xid, flag == XAResource.TMFAIL ? Option.FAIL : Option.NO_OPTION,
+                                       flag == XAResource.TMSUSPEND ? Option.SUSPEND : Option.NO_OPTION);
+        }
+        catch (QpidException e)
+        {
+            throw ExceptionHelper.convertQpidExceptionToXAException(e);
+        }
+    }
+
+    /**
+     * Tells the resource manager to forget about a heuristically completed transaction branch.
+     *
+     * @param xid A global transaction identifier
+     * @throws XAException An error has occurred. Possible exception values are XAER_RMERR,
XAER_RMFAIL,
+     *                     XAER_NOTA, XAER_INVAL, or XAER_PROTO.
+     */
+    public void forget(Xid xid) throws XAException
+    {
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("forget ", xid);
+        }
+        try
+        {
+            _xaSession.getQpidSession()
+                    .dtxCoordinationForget(xid);
+        }
+        catch (QpidException e)
+        {
+            throw ExceptionHelper.convertQpidExceptionToXAException(e);
+        }
+    }
+
+    /**
+     * Obtains the current transaction timeout value set for this XAResource instance.
+     * If XAResource.setTransactionTimeout was not used prior to invoking this method,
+     * the return value is the default timeout i.e. 0;
+     * otherwise, the value used in the previous setTransactionTimeout call is returned.
+     *
+     * @return The transaction timeout value in seconds.
+     * @throws XAException An error has occurred. Possible exception values are XAER_RMERR,
XAER_RMFAIL.
+     */
+    public int getTransactionTimeout() throws XAException
+    {
+        int result = 0;
+        if (_xid != null)
+        {
+            try
+            {
+                result = (int) _xaSession.getQpidSession().dtxCoordinationGetTimeout(_xid);
+            }
+            catch (QpidException e)
+            {
+                throw ExceptionHelper.convertQpidExceptionToXAException(e);
+            }
+        }
+        return result;
+    }
+
+    /**
+     * This method is called to determine if the resource manager instance represented
+     * by the target object is the same as the resouce manager instance represented by
+     * the parameter xaResource.
+     *
+     * @param xaResource An XAResource object whose resource manager instance is to
+     *                   be compared with the resource manager instance of the target object
+     * @return <code>true</code> if it's the same RM instance; otherwise <code>false</code>.
+     * @throws XAException An error has occurred. Possible exception values are XAER_RMERR,
XAER_RMFAIL.
+     */
+    public boolean isSameRM(XAResource xaResource) throws XAException
+    {
+        // TODO : get the server identity of xaResource and compare it with our own one
+        return false;
+    }
+
+    /**
+     * Prepare for a transaction commit of the transaction specified in <code>Xid</code>.
+     *
+     * @param xid A global transaction identifier.
+     * @return A value indicating the resource manager's vote on the outcome of the transaction.
+     *         The possible values are: XA_RDONLY or XA_OK.
+     * @throws XAException An error has occurred. Possible exception values are: XAER_RMERR
or XAER_NOTA
+     */
+    public int prepare(Xid xid) throws XAException
+    {
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("prepare ", xid);
+        }
+        int result;
+        try
+        {
+            result = _xaSession.getQpidSession()
+                    .dtxCoordinationPrepare(xid);
+        }
+        catch (QpidException e)
+        {
+            throw ExceptionHelper.convertQpidExceptionToXAException(e);
+        }
+        if (result == XAException.XA_RDONLY)
+        {
+            throw new XAException(XAException.XA_RDONLY);
+        }
+        else if (result == XAException.XA_RBROLLBACK)
+        {
+            throw new XAException(XAException.XA_RBROLLBACK);
+        }
+        return result;
+    }
+
+    /**
+     * Obtains a list of prepared transaction branches.
+     * <p/>
+     * The transaction manager calls this method during recovery to obtain the list of transaction
branches
+     * that are currently in prepared or heuristically completed states.
+     *
+     * @param flag One of TMSTARTRSCAN, TMENDRSCAN, TMNOFLAGS.
+     *             TMNOFLAGS must be used when no other flags are set in the parameter.
+     * @return zero or more XIDs of the transaction branches that are currently in a prepared
or heuristically
+     *         completed state.
+     * @throws XAException An error has occurred. Possible value is XAER_INVAL.
+     */
+    public Xid[] recover(int flag) throws XAException
+    {
+        try
+        {
+            // the flag is ignored 
+            return _xaSession.getQpidSession()
+                    .dtxCoordinationRecover();
+        }
+        catch (QpidException e)
+        {
+            throw ExceptionHelper.convertQpidExceptionToXAException(e);
+        }
+    }
+
+    /**
+     * Informs the resource manager to roll back work done on behalf of a transaction branch
+     *
+     * @param xid A global transaction identifier.
+     * @throws XAException An error has occurred.
+     */
+    public void rollback(Xid xid) throws XAException
+    {
+        try
+        {
+            // the flag is ignored
+            _xaSession.getQpidSession()
+                    .dtxCoordinationRollback(xid);
+        }
+        catch (QpidException e)
+        {
+            throw ExceptionHelper.convertQpidExceptionToXAException(e);
+        }
+    }
+
+    /**
+     * Sets the current transaction timeout value for this XAResource instance.
+     * Once set, this timeout value is effective until setTransactionTimeout is
+     * invoked again with a different value.
+     * To reset the timeout value to the default value used by the resource manager, set
the value to zero.
+     *
+     * @param timeout The transaction timeout value in seconds.
+     * @return true if transaction timeout value is set successfully; otherwise false.
+     * @throws XAException An error has occurred. Possible exception values are XAER_RMERR,
XAER_RMFAIL, or XAER_INVAL.
+     */
+    public boolean setTransactionTimeout(int timeout) throws XAException
+    {
+        boolean result = false;
+        if (_xid != null)
+        {
+            try
+            {
+                // the flag is ignored
+                _xaSession.getQpidSession()
+                        .dtxCoordinationSetTimeout(_xid, timeout);
+                result = true;
+            }
+            catch (QpidException e)
+            {
+                throw ExceptionHelper.convertQpidExceptionToXAException(e);
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Starts work on behalf of a transaction branch specified in xid.
+     * <ul>
+     * <li> If TMJOIN is specified, an exception is thrown as it is not supported
+     * <li> If TMRESUME is specified, the start applies to resuming a suspended transaction
specified in the parameter xid.
+     * <li> If neither TMJOIN nor TMRESUME is specified and the transaction specified
by xid has previously been seen by the
+     * resource manager, the resource manager throws the XAException exception with XAER_DUPID
error code.
+     * </ul>
+     *
+     * @param xid  A global transaction identifier to be associated with the resource
+     * @param flag One of TMNOFLAGS, TMJOIN, or TMRESUME
+     * @throws XAException An error has occurred. Possible exceptions
+     *                     are XA_RB*, XAER_RMERR, XAER_RMFAIL, XAER_DUPID, XAER_OUTSIDE,
XAER_NOTA, XAER_INVAL, or XAER_PROTO.
+     */
+    public void start(Xid xid, int flag) throws XAException
+    {
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("start ", xid);
+        }
+        _xid = xid;
+        try
+        {
+            _xaSession.getQpidSession()
+                    .dtxDemarcationStart(xid, flag == XAResource.TMJOIN ? Option.JOIN : Option.NO_OPTION,
+                                         flag == XAResource.TMRESUME ? Option.RESUME : Option.NO_OPTION);
+        }
+        catch (QpidException e)
+        {
+            throw ExceptionHelper.convertQpidExceptionToXAException(e);
+        }
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XASessionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XASessionImpl.java?view=auto&rev=563197
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XASessionImpl.java
(added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XASessionImpl.java
Mon Aug  6 10:15:56 2007
@@ -0,0 +1,118 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.jms;
+
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.DtxSession;
+
+import javax.jms.XASession;
+import javax.jms.Session;
+import javax.jms.JMSException;
+import javax.jms.TransactionInProgressException;
+import javax.transaction.xa.XAResource;
+
+/**
+ * This is an implementation of the javax.jms.XASEssion interface.
+ */
+public class XASessionImpl extends SessionImpl implements XASession
+{
+    /**
+     * XAResource associated with this XASession
+     */
+    private final XAResourceImpl _xaResource;
+
+    /**
+     * This XASession Qpid DtxSession
+     */
+    private DtxSession _qpidDtxSession;
+
+    //-- Constructors
+    /**
+     * Create a JMS XASession
+     *
+     * @param connection The ConnectionImpl object from which the Session is created.
+     * @throws QpidException In case of internal error.
+     */
+    protected XASessionImpl(ConnectionImpl connection) throws QpidException
+    {
+        super(connection, true,  // this is a transacted session
+              Session.SESSION_TRANSACTED, // the ack mode is transacted
+              true); // this is an XA session so do not set tx
+        _qpidDtxSession = getConnection().getQpidConnection().createDTXSession(0);
+        _xaResource = new XAResourceImpl(this);
+    }
+
+    //--- javax.jms.XASEssion API
+
+    /**
+     * Gets the session associated with this XASession.
+     *
+     * @return the session object
+     * @throws JMSException if an internal error occurs.
+     * @since 1.1
+     */
+    public Session getSession() throws JMSException
+    {
+        return this;
+    }
+
+    /**
+     * Returns an XA resource.
+     *
+     * @return An XA resource.
+     */
+    public XAResource getXAResource()
+    {
+        return _xaResource;
+    }
+
+    //-- overwritten mehtods
+    /**
+     * Throws a {@link TransactionInProgressException}, since it should
+     * not be called for an XASession object.
+     *
+     * @throws TransactionInProgressException always.
+     */
+    public void commit() throws JMSException
+    {
+        throw new TransactionInProgressException(
+                "XASession:  A direct invocation of the commit operation is probibited!");
+    }
+
+    /**
+     * Throws a {@link TransactionInProgressException}, since it should
+     * not be called for an XASession object.
+     *
+     * @throws TransactionInProgressException always.
+     */
+    public void rollback() throws JMSException
+    {
+        throw new TransactionInProgressException(
+                "XASession: A direct invocation of the rollback operation is probibited!");
+    }
+
+    /**
+     * Access to the underlying Qpid Session
+     *
+     * @return The associated Qpid Session.
+     */
+    protected org.apache.qpidity.DtxSession getQpidSession()
+    {
+        return _qpidDtxSession;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XASessionImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message