activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1405942 [2/2] - in /activemq/activemq-apollo/trunk: apollo-amqp/ apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/ apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/ apollo-amqp/src/main/scala/org/apache...
Date Mon, 05 Nov 2012 20:32:49 GMT
Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/DroppingWritableBuffer.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/DroppingWritableBuffer.java?rev=1405942&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/DroppingWritableBuffer.java
(added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/DroppingWritableBuffer.java
Mon Nov  5 20:32:47 2012
@@ -0,0 +1,91 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.activemq.apollo.amqp.hawtdispatch;
+
+import org.apache.qpid.proton.codec.WritableBuffer;
+
+import java.nio.ByteBuffer;
+
+public class DroppingWritableBuffer implements WritableBuffer
+{
+    int pos = 0;
+
+    @Override
+    public boolean hasRemaining() {
+        return true;
+    }
+
+    @Override
+    public void put(byte b) {
+        pos += 1;
+    }
+
+    @Override
+    public void putFloat(float f) {
+        pos += 4;
+    }
+
+    @Override
+    public void putDouble(double d) {
+        pos += 8;
+    }
+
+    @Override
+    public void put(byte[] src, int offset, int length) {
+        pos += length;
+    }
+
+    @Override
+    public void putShort(short s) {
+        pos += 2;
+    }
+
+    @Override
+    public void putInt(int i) {
+        pos += 4;
+    }
+
+    @Override
+    public void putLong(long l) {
+        pos += 8;
+    }
+
+    @Override
+    public int remaining() {
+        return Integer.MAX_VALUE - pos;
+    }
+
+    @Override
+    public int position() {
+        return pos;
+    }
+
+    @Override
+    public void position(int position) {
+        pos = position;
+    }
+
+    @Override
+    public void put(ByteBuffer payload) {
+        pos += payload.remaining();
+        payload.position(payload.limit());
+    }
+}

Copied: activemq/activemq-apollo/trunk/apollo-amqp/src/test-swiftmq/scala/org/apache/activemq/apollo/amqp/test/SwiftMQClientTest.scala
(from r1405941, activemq/activemq-apollo/trunk/apollo-amqp/src/test-swiftmq/scala/org/apache/activemq/apollo/amqp/SwiftMQClientTest.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test-swiftmq/scala/org/apache/activemq/apollo/amqp/test/SwiftMQClientTest.scala?p2=activemq/activemq-apollo/trunk/apollo-amqp/src/test-swiftmq/scala/org/apache/activemq/apollo/amqp/test/SwiftMQClientTest.scala&p1=activemq/activemq-apollo/trunk/apollo-amqp/src/test-swiftmq/scala/org/apache/activemq/apollo/amqp/SwiftMQClientTest.scala&r1=1405941&r2=1405942&rev=1405942&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/test-swiftmq/scala/org/apache/activemq/apollo/amqp/SwiftMQClientTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/test-swiftmq/scala/org/apache/activemq/apollo/amqp/test/SwiftMQClientTest.scala
Mon Nov  5 20:32:47 2012
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.activemq.apollo.amqp
+package org.apache.activemq.apollo.amqp.test
 
 import com.swiftmq.amqp.AMQPContext
 import com.swiftmq.amqp.v100.client.Connection
@@ -94,4 +94,4 @@ class SwiftMQClientTest extends AmqpTest
     }
 
   }
-}
\ No newline at end of file
+}

Copied: activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/provider.properties
(from r1405941, activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/provider.properties?p2=activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/provider.properties&p1=activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index&r1=1405941&r2=1405942&rev=1405942&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index
(original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/provider.properties Mon
Nov  5 20:32:47 2012
@@ -5,13 +5,16 @@
 ## The ASF licenses this file to You under the Apache License, Version 2.0
 ## (the "License"); you may not use this file except in compliance with
 ## the License.  You may obtain a copy of the License at
-## 
+##
 ## http://www.apache.org/licenses/LICENSE-2.0
-## 
+##
 ## Unless required by applicable law or agreed to in writing, software
 ## distributed under the License is distributed on an "AS IS" BASIS,
 ## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 ## See the License for the specific language governing permissions and
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
-org.apache.activemq.apollo.amqp.AmqpProtocolFactory
\ No newline at end of file
+
+# This config file is used by the joram jms tests.
+#
+jms.provider.admin.class=org.apache.activemq.apollo.amqp.joram.ApolloAdmin
\ No newline at end of file

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/hawtdispatch/TransportConnectionTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/hawtdispatch/TransportConnectionTest.java?rev=1405942&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/hawtdispatch/TransportConnectionTest.java
(added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/hawtdispatch/TransportConnectionTest.java
Mon Nov  5 20:32:47 2012
@@ -0,0 +1,178 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.activemq.apollo.amqp.hawtdispatch;
+
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.engine.Session;
+import org.fusesource.hawtdispatch.DispatchQueue;
+import org.fusesource.hawtdispatch.Task;
+import org.fusesource.hawtdispatch.transport.*;
+import org.junit.After;
+import org.junit.Test;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.fusesource.hawtdispatch.Dispatch.NOOP;
+import static org.fusesource.hawtdispatch.Dispatch.createQueue;
+import static org.junit.Assert.*;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class TransportConnectionTest {
+
+    // Using a single dispatch queue for everything should simplify debugging issues.
+    final DispatchQueue queue = createQueue();
+
+    @Test
+    public void testOpenSession() throws Exception {
+        final CountDownLatch done = new CountDownLatch(1);
+
+        // Setup a little server...
+        final TcpTransportServer server = startServer(new AmqpListener() {
+            public void proccessSessionOpen(Session session, Task onComplete) {
+                System.out.println("session opened..");
+                session.open();
+                done.countDown();
+            }
+        });
+        final String address = server.getBoundAddress();
+
+        // Start a client..
+        queue.execute(new Task() {
+            public void run() {
+                try {
+                    System.out.println("Creating a client connection.");
+                    AmqpConnection c = startClient(address);
+                    Session session = c.getProtonConnection().session();
+                    session.open();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        assertTrue(done.await(3, TimeUnit.SECONDS));
+    }
+
+    final ArrayList<AmqpConnection> clients = new ArrayList<AmqpConnection>();
+    synchronized private AmqpConnection startClient(String address) throws Exception {
+        TcpTransport transport = new TcpTransport();
+        transport.setDispatchQueue(queue);
+        transport.connecting(new URI(address), null);
+        final AmqpConnection clientConnection = new AmqpConnection();
+        clientConnection.setListener(new AmqpListener(){
+            @Override
+            public void processTransportConnected() {
+                clientConnection.pumpOut();
+            }
+        });
+        clientConnection.bind(transport);
+        clients.add(clientConnection);
+        clientConnection.start(NOOP);
+        clientConnection.getProtonConnection().open();
+        return clientConnection;
+    }
+
+    @After
+    synchronized public void stopClients() throws Exception {
+        for (AmqpConnection client : clients) {
+            stop(client);
+        }
+        clients.clear();
+    }
+
+    final ArrayList<TransportServer> servers = new ArrayList<TransportServer>();
+    synchronized protected TcpTransportServer startServer(final AmqpListener serverHandler)
throws Exception {
+        final TcpTransportServer server = new TcpTransportServer(new URI("tcp://localhost:0"));
+        server.setDispatchQueue(queue);
+        server.setTransportServerListener(new TransportServerListener() {
+            public void onAccept(Transport transport) throws Exception {
+                System.out.println("Server accepted a client connection.");
+                transport.setDispatchQueue(queue);
+                AmqpConnection serverConnection = new AmqpConnection();
+                serverConnection.bind(transport);
+                serverConnection.setListener(serverHandler);
+                serverConnection.start(NOOP);
+            }
+
+            public void onAcceptError(Exception error) {
+                error.printStackTrace();
+            }
+        });
+        start(server);
+        servers.add(server);
+        return server;
+    }
+
+    @After
+    synchronized public void stopServers() throws Exception {
+        for (TransportServer server : servers) {
+            stop(server);
+        }
+        servers.clear();
+    }
+
+    private void start(TransportServer transport) throws Exception {
+        final CountDownLatch done = new CountDownLatch(1);
+        transport.start(new Task() {
+            @Override
+            public void run() {
+                done.countDown();
+            }
+        });
+        done.await();
+    }
+    private void stop(TransportServer transport) throws Exception {
+        final CountDownLatch done = new CountDownLatch(1);
+        transport.stop(new Task() {
+            @Override
+            public void run() {
+                done.countDown();
+            }
+        });
+        done.await();
+    }
+
+    private void start(AmqpConnection transport) throws Exception {
+        final CountDownLatch done = new CountDownLatch(1);
+        transport.start(new Task() {
+            @Override
+            public void run() {
+                done.countDown();
+            }
+        });
+        done.await();
+    }
+    private void stop(AmqpConnection transport) throws Exception {
+        final CountDownLatch done = new CountDownLatch(1);
+        transport.stop(new Task() {
+            @Override
+            public void run() {
+                done.countDown();
+            }
+        });
+        done.await();
+    }
+}

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/joram/ApolloAdmin.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/joram/ApolloAdmin.java?rev=1405942&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/joram/ApolloAdmin.java
(added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/joram/ApolloAdmin.java
Mon Nov  5 20:32:47 2012
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.amqp.joram;
+
+import org.apache.activemq.apollo.broker.Broker;
+import org.apache.activemq.apollo.broker.BrokerFactory;
+import org.apache.activemq.apollo.util.ServiceControl;
+import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
+import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl;
+import org.apache.qpid.amqp_1_0.jms.impl.TopicImpl;
+import org.objectweb.jtests.jms.admin.Admin;
+
+import javax.jms.ConnectionFactory;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.net.InetSocketAddress;
+import java.util.Hashtable;
+import java.util.logging.*;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class ApolloAdmin implements Admin {
+
+    Context context;
+    {
+        // enableJMSFrameTracing();
+        try {
+            // Use the jetty JNDI context since it's mutable.
+            final Hashtable<String, String> env = new Hashtable<String, String>();
+            env.put("java.naming.factory.initial", "org.eclipse.jetty.jndi.InitialContextFactory");
+            env.put("java.naming.factory.url.pkgs", "org.eclipse.jetty.jndi");;
+            context = new InitialContext(env);
+        } catch (NamingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    static public void enableJMSFrameTracing() throws FileNotFoundException {
+        final SimpleFormatter formatter = new SimpleFormatter();
+        final PrintStream out = new PrintStream(new FileOutputStream(new File("/tmp/amqp-trace.txt")));
+        Handler handler = new Handler() {
+            @Override
+            public void publish(LogRecord r) {
+                out.println(String.format("%s:%s", r.getLoggerName(), r.getMessage()));
+            }
+
+            @Override
+            public void flush() {
+                out.flush();
+            }
+
+            @Override
+            public void close() throws SecurityException {
+            }
+        };
+
+        Logger log = Logger.getLogger("FRM");
+        log.addHandler(handler);
+        log.setLevel(Level.FINEST);
+    }
+
+    public String brokerConfig = "xml:classpath:apollo-amqp.xml";
+
+    public String getName() {
+        return getClass().getName();
+    }
+
+    static Broker broker;
+    static int port;
+
+    public void startServer() throws Exception {
+//        if( broker!=null ) {
+//            stopServer();
+//        }
+        broker = createBroker();
+        ServiceControl.start(broker);
+        port = ((InetSocketAddress) broker.get_socket_address()).getPort();
+    }
+
+    protected Broker createBroker() throws Exception {
+        if (System.getProperty("basedir") == null) {
+            File file = new File(".");
+            System.setProperty("basedir", file.getAbsolutePath());
+        }
+        return BrokerFactory.createBroker(brokerConfig);
+    }
+
+    public void stopServer() throws Exception {
+        ServiceControl.stop(broker);
+        broker = null;
+    }
+
+    public void start() throws Exception {
+    }
+
+    public void stop() throws Exception {
+    }
+
+    public Context createContext() throws NamingException {
+        return context;
+    }
+
+    public void createQueue(String name) {
+        try {
+            context.bind(name, new QueueImpl("/queue/"+name));
+        } catch (NamingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void createTopic(String name) {
+        try {
+            context.bind(name, new TopicImpl("/topic/"+name));
+        } catch (NamingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void deleteQueue(String name) {
+        // BrokerTestSupport.delete_queue((Broker)base.broker, name);
+        try {
+            context.unbind(name);
+        } catch (NamingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void deleteTopic(String name) {
+        try {
+            context.unbind(name);
+        } catch (NamingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void createConnectionFactory(String name) {
+        try {
+            final ConnectionFactory factory = new ConnectionFactoryImpl("localhost", port,
null, null);
+            context.bind(name, factory);
+        } catch (NamingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void deleteConnectionFactory(String name) {
+        try {
+            context.unbind(name);
+        } catch (NamingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void createQueueConnectionFactory(String name) {
+        createConnectionFactory(name);
+    }
+    public void createTopicConnectionFactory(String name) {
+        createConnectionFactory(name);
+    }
+    public void deleteQueueConnectionFactory(String name) {
+        deleteConnectionFactory(name);
+    }
+    public void deleteTopicConnectionFactory(String name) {
+        deleteConnectionFactory(name);
+    }
+
+}

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/joram/JoramJmsTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/joram/JoramJmsTest.java?rev=1405942&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/joram/JoramJmsTest.java
(added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/joram/JoramJmsTest.java
Mon Nov  5 20:32:47 2012
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.amqp.joram;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+import org.objectweb.jtests.jms.conform.connection.ConnectionTest;
+import org.objectweb.jtests.jms.conform.connection.TopicConnectionTest;
+import org.objectweb.jtests.jms.conform.message.MessageBodyTest;
+import org.objectweb.jtests.jms.conform.message.MessageDefaultTest;
+import org.objectweb.jtests.jms.conform.message.MessageTypeTest;
+import org.objectweb.jtests.jms.conform.message.headers.MessageHeaderTest;
+import org.objectweb.jtests.jms.conform.message.properties.JMSXPropertyTest;
+import org.objectweb.jtests.jms.conform.message.properties.MessagePropertyConversionTest;
+import org.objectweb.jtests.jms.conform.message.properties.MessagePropertyTest;
+import org.objectweb.jtests.jms.conform.queue.QueueBrowserTest;
+import org.objectweb.jtests.jms.conform.queue.TemporaryQueueTest;
+import org.objectweb.jtests.jms.conform.selector.SelectorSyntaxTest;
+import org.objectweb.jtests.jms.conform.selector.SelectorTest;
+import org.objectweb.jtests.jms.conform.session.QueueSessionTest;
+import org.objectweb.jtests.jms.conform.session.SessionTest;
+import org.objectweb.jtests.jms.conform.session.TopicSessionTest;
+import org.objectweb.jtests.jms.conform.session.UnifiedSessionTest;
+import org.objectweb.jtests.jms.conform.topic.TemporaryTopicTest;
+
+import java.io.File;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class JoramJmsTest extends TestCase {
+
+    public static Test suite() {
+        if (System.getProperty("basedir") == null) {
+            File file = new File(".");
+            System.setProperty("basedir", file.getAbsolutePath());
+        }
+
+        TestSuite suite = new TestSuite();
+
+        // TODO: Fix these tests..
+        if (false) {
+            // Fails due to durable subs not being implemented.
+            suite.addTestSuite(TopicSessionTest.class);
+            // Fails due to https://issues.apache.org/jira/browse/PROTON-110 and DestinationImpl
vs QueueImpl mapping issues
+            suite.addTestSuite(MessageHeaderTest.class);
+            // Fails due to inconsistent Message mapping in the JMS client.
+            suite.addTestSuite(MessageTypeTest.class);
+            suite.addTestSuite(QueueBrowserTest.class);
+
+        }
+
+        // TODO: enable once QPID 0.19 is released
+        if(false) {
+            suite.addTestSuite(UnifiedSessionTest.class);
+            suite.addTestSuite(TemporaryTopicTest.class);
+            suite.addTestSuite(TopicConnectionTest.class);
+        }
+
+        if( false ) {
+            suite.addTestSuite(SelectorSyntaxTest.class);
+            suite.addTestSuite(QueueSessionTest.class);
+            suite.addTestSuite(SelectorTest.class);
+            suite.addTestSuite(TemporaryQueueTest.class);
+            suite.addTestSuite(SessionTest.class);
+        }
+
+        // Passing tests
+        if( false ) {
+            suite.addTestSuite(ConnectionTest.class);
+            suite.addTestSuite(JMSXPropertyTest.class);
+            suite.addTestSuite(MessageBodyTest.class);
+            suite.addTestSuite(MessageDefaultTest.class);
+            suite.addTestSuite(MessagePropertyConversionTest.class);
+            suite.addTestSuite(MessagePropertyTest.class);
+        }
+
+        return suite;
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+}

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/AmqpTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/AmqpTest.scala?rev=1405942&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/AmqpTest.scala
(added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/AmqpTest.scala
Mon Nov  5 20:32:47 2012
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.amqp.test
+
+import org.scalatest.matchers.ShouldMatchers
+import org.scalatest.BeforeAndAfterEach
+import org.apache.activemq.apollo.broker._
+import org.junit.Test
+import org.apache.qpid.amqp_1_0.jms.impl.{ConnectionFactoryImpl, QueueImpl}
+import javax.jms._
+
+class AmqpTestSupport extends BrokerFunSuiteSupport with ShouldMatchers with BeforeAndAfterEach
{
+  override def broker_config_uri = "xml:classpath:apollo-amqp.xml"
+}
+

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/QpidJmsTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/QpidJmsTest.scala?rev=1405942&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/QpidJmsTest.scala
(added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/QpidJmsTest.scala
Mon Nov  5 20:32:47 2012
@@ -0,0 +1,95 @@
+package org.apache.activemq.apollo.amqp.test
+
+import org.apache.qpid.amqp_1_0.jms.impl.{ConnectionFactoryImpl, QueueImpl}
+import javax.jms._
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+
+class QpidJmsTest extends AmqpTestSupport {
+
+  def createConnection: Connection = {
+    val factory = new ConnectionFactoryImpl("localhost", port, "admin", "password")
+    val connection = factory.createConnection
+    connection.setExceptionListener(new ExceptionListener {
+      def onException(exception: JMSException) {
+        exception.printStackTrace
+      }
+    })
+    connection.start
+    return connection
+  }
+
+
+//  test("browse") {
+//    val queue = new QueueImpl("queue://txqueue")
+//    val connection = createConnection
+//    val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+//    val p = session.createProducer(queue)
+//    val msg = session.createTextMessage("Hello World")
+//    msg.setObjectProperty("x", 1)
+//    p.send(msg)
+//    val browser = session.createBrowser(queue)
+//    val enumeration = browser.getEnumeration
+//    while (enumeration.hasMoreElements) {
+//      System.out.println("BROWSE " + enumeration.nextElement)
+//    }
+//    connection.close
+//  }
+
+  test("Send Nack Receive") {
+    val queue = new QueueImpl("/queue/testqueue")
+    val nMsgs = 1
+    val dataFormat: String = "%01024d"
+
+    var connection = createConnection
+    var session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val p = session.createProducer(queue)
+    var i = 0
+    while (i < nMsgs) {
+      System.out.println("Sending " + i)
+      p.send(session.createTextMessage(dataFormat.format(i)))
+      i += 1
+    }
+    connection.close
+
+    System.out.println("=======================================================================================")
+    System.out.println(" failing a receive ")
+    System.out.println("=======================================================================================")
+    connection = createConnection
+    session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE)
+    var c = session.createConsumer(queue)
+    i = 0
+    while (i < 1) {
+      val msg: TextMessage = c.receive.asInstanceOf[TextMessage]
+      if (msg != null) {
+        val s: String = msg.getText
+        s should  be(dataFormat.format(i))
+        System.out.println("Received: " + i)
+        i += 1
+      }
+    }
+    connection.close
+
+    System.out.println("=======================================================================================")
+    System.out.println(" receiving ")
+    System.out.println("=======================================================================================")
+    connection = createConnection
+    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    c = session.createConsumer(queue)
+    i = 0
+    while (i < nMsgs) {
+      val msg = c.receive.asInstanceOf[TextMessage]
+      if (msg != null) {
+        val s = msg.getText
+        s should  be(dataFormat.format(i))
+        System.out.println("Received: " + i)
+        i += 1
+      }
+    }
+    connection.close
+  }
+
+
+}
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala?rev=1405942&r1=1405941&r2=1405942&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
Mon Nov  5 20:32:47 2012
@@ -905,47 +905,63 @@ class QueueEntry(val queue:Queue, val se
     }
     override def toString = { "swapped_range:{ swapping_in: "+loading+", count: "+count+",
size: "+size+"}" }
 
-    override def swap_in(space:MemorySpace) = {
+    override def swap_in(space:MemorySpace):Unit = {
       if( !loading ) {
         loading = true
-        queue.virtual_host.store.list_queue_entries(queue.store_id, seq, last) { records
=>
-          queue.dispatch_queue {
-            loading  = false
-            assert(isLinked)
-
-            var item_count=0
-            var size_count=0
-
-            val tmpList = new LinkedNodeList[QueueEntry]()
-            records.foreach { record =>
-              val entry = new QueueEntry(queue, record.entry_seq).init(record)
-              tmpList.addLast(entry)
-              item_count += 1
-              size_count += record.size
-            }
 
-            // we may need to adjust the enqueue count if entries
-            // were dropped at the store level
-            var item_delta = (count - item_count)
-            val size_delta: Int = size - size_count
+        def complete_load(attempt_counter:Int, records:Seq[QueueEntryRecord]):Unit = {
+          assert(isLinked)
+
+          var item_count=0
+          var size_count=0
+
+          val tmpList = new LinkedNodeList[QueueEntry]()
+          records.foreach { record =>
+            val entry = new QueueEntry(queue, record.entry_seq).init(record)
+            tmpList.addLast(entry)
+            item_count += 1
+            size_count += record.size
+          }
 
-            if ( item_delta!=0 || size_delta!=0 ) {
+          // we may need to adjust the enqueue count if entries
+          // were dropped at the store level
+          var item_delta = (count - item_count)
+          val size_delta: Int = size - size_count
+
+          if ( item_delta!=0 || size_delta!=0 ) {
+            if ( attempt_counter < 10) {
+              warn("Retrying "+attempt_counter+" load do to Queue '%s' detected store change
in range [%d:%d]. %d message(s) and %d bytes", queue.id, seq, last, item_delta, size_delta)
+              attempt_load(attempt_counter+1)
+              return
+            } else {
               warn("Queue '%s' detected store change in range [%d:%d]. %d message(s) and
%d bytes", queue.id, seq, last, item_delta, size_delta)
               queue.enqueue_item_counter += item_delta
               queue.enqueue_size_counter += size_delta
             }
+          } else if( attempt_counter > 1 ) {
+            warn("Recoved!!!! @ "+attempt_counter)
+          }
 
-            linkAfter(tmpList)
-            val next = getNext
+          loading  = false
+          linkAfter(tmpList)
+          val next = getNext
+
+          // move the subs to the first entry that we just loaded.
+          parked.foreach(_.advance(next))
+          next :::= parked
+          queue.trigger_swap
 
-            // move the subs to the first entry that we just loaded.
-            parked.foreach(_.advance(next))
-            next :::= parked
-            queue.trigger_swap
+          unlink
+        }
 
-            unlink
+        def attempt_load(attempt_counter:Int):Unit = {
+          queue.virtual_host.store.list_queue_entries(queue.store_id, seq, last) { records
=>
+            queue.dispatch_queue {
+              complete_load(attempt_counter, records)
+            }
           }
         }
+        attempt_load(1)
       }
     }
 



Mime
View raw message