activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r813703 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/nio/ main/java/org/apache/activemq/transport/stomp/ main/resources/META-INF/services/org/apache/activemq/transport/ test/java/org/apache/activemq/transp...
Date Fri, 11 Sep 2009 07:57:05 GMT
Author: dejanb
Date: Fri Sep 11 07:57:05 2009
New Revision: 813703

URL: http://svn.apache.org/viewvc?rev=813703&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2239 - stomp+nio transport implementation

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOBufferedInputStream.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransportFactory.java
    activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp+nio
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompNIOTest.java
    activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/transport/stomp/niostomp-auth-broker.xml

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOBufferedInputStream.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOBufferedInputStream.java?rev=813703&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOBufferedInputStream.java
(added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOBufferedInputStream.java
Fri Sep 11 07:57:05 2009
@@ -0,0 +1,185 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.nio;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channel;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+
+/**
+ * Implementation of InputStream using Java NIO channel,direct buffer and
+ * Selector
+ */
+public class NIOBufferedInputStream extends InputStream {
+
+    private final static int BUFFER_SIZE = 8192;
+
+    private SocketChannel sc = null;
+
+    private ByteBuffer bb = null;
+
+    private Selector rs = null;
+
+    public NIOBufferedInputStream(ReadableByteChannel channel, int size)
+            throws ClosedChannelException, IOException {
+
+        if (size <= 0) {
+            throw new IllegalArgumentException("Buffer size <= 0");
+        }
+
+        this.bb = ByteBuffer.allocateDirect(size);
+        this.sc = (SocketChannel) channel;
+
+        this.sc.configureBlocking(false);
+
+        this.rs = Selector.open();
+
+        sc.register(rs, SelectionKey.OP_READ);
+
+        bb.position(0);
+        bb.limit(0);
+    }
+
+    public NIOBufferedInputStream(ReadableByteChannel channel)
+            throws ClosedChannelException, IOException {
+        this(channel, BUFFER_SIZE);
+    }
+
+    public int available() throws IOException {
+        if (!rs.isOpen())
+            throw new IOException("Input Stream Closed");
+
+        return bb.remaining();
+    }
+
+    public void close() throws IOException {
+        if (rs.isOpen()) {
+            rs.close();
+
+            if (sc.isOpen()) {
+                sc.socket().shutdownInput();
+                sc.socket().close();
+            }
+
+            bb = null;
+            sc = null;
+        }
+    }
+
+    public int read() throws IOException {
+        if (!rs.isOpen())
+            throw new IOException("Input Stream Closed");
+
+        if (!bb.hasRemaining()) {
+            try {
+                fill(1);
+            } catch (ClosedChannelException e) {
+                close();
+                return -1;
+            }
+        }
+
+        return (bb.get() & 0xFF);
+    }
+
+    public int read(byte[] b, int off, int len) throws IOException {
+        int bytesCopied = -1;
+
+        if (!rs.isOpen())
+            throw new IOException("Input Stream Closed");
+
+        while (bytesCopied == -1) {
+            if (bb.hasRemaining()) {
+                bytesCopied = (len < bb.remaining() ? len : bb.remaining());
+                bb.get(b, off, bytesCopied);
+            } else {
+                try {
+                    fill(1);
+                } catch (ClosedChannelException e) {
+                    close();
+                    return -1;
+                }
+            }
+        }
+
+        return bytesCopied;
+    }
+
+    public long skip(long n) throws IOException {
+        long skiped = 0;
+
+        if (!rs.isOpen())
+            throw new IOException("Input Stream Closed");
+
+        while (n > 0) {
+            if (n <= bb.remaining()) {
+                skiped += n;
+                bb.position(bb.position() + (int) n);
+                n = 0;
+            } else {
+                skiped += bb.remaining();
+                n -= bb.remaining();
+
+                bb.position(bb.limit());
+
+                try {
+                    fill((int) n);
+                } catch (ClosedChannelException e) {
+                    close();
+                    return skiped;
+                }
+            }
+        }
+
+        return skiped;
+    }
+
+    private void fill(int n) throws IOException, ClosedChannelException {
+        int bytesRead = -1;
+
+        if ((n <= 0) || (n <= bb.remaining()))
+            return;
+
+        bb.compact();
+
+        n = (bb.remaining() < n ? bb.remaining() : n);
+
+        for (;;) {
+            bytesRead = sc.read(bb);
+
+            if (bytesRead == -1)
+                throw new ClosedChannelException();
+
+            n -= bytesRead;
+
+            if (n <= 0)
+                break;
+
+            rs.select(0);
+            rs.selectedKeys().clear();
+        }
+
+        bb.flip();
+    }
+}
\ No newline at end of file

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java?rev=813703&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
(added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
Fri Sep 11 07:57:05 2009
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.nio.channels.SocketChannel;
+
+import javax.net.SocketFactory;
+
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.nio.NIOBufferedInputStream;
+import org.apache.activemq.transport.nio.NIOOutputStream;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.wireformat.WireFormat;
+
+/**
+ * An implementation of the {@link Transport} interface for using Stomp over NIO
+ * 
+ * @version $Revision$
+ */
+public class StompNIOTransport extends TcpTransport {
+
+    private SocketChannel channel;
+
+    public StompNIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation,
URI localLocation) throws UnknownHostException, IOException {
+        super(wireFormat, socketFactory, remoteLocation, localLocation);
+    }
+
+    public StompNIOTransport(WireFormat wireFormat, Socket socket) throws IOException {
+        super(wireFormat, socket);
+    }
+
+    protected void initializeStreams() throws IOException {
+        channel = socket.getChannel();
+        channel.configureBlocking(false);
+
+        this.dataOut = new DataOutputStream(new NIOOutputStream(channel, 16 * 1024));
+        this.dataIn = new DataInputStream(new NIOBufferedInputStream(channel, 8 * 1024));
+    }
+
+}

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransportFactory.java?rev=813703&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransportFactory.java
(added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransportFactory.java
Fri Sep 11 07:57:05 2009
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.Map;
+
+import javax.net.ServerSocketFactory;
+import javax.net.SocketFactory;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.nio.NIOTransport;
+import org.apache.activemq.transport.nio.NIOTransportFactory;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.transport.tcp.TcpTransportServer;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.xbean.XBeanBrokerService;
+import org.springframework.context.ApplicationContext;
+
+/**
+ * A <a href="http://stomp.codehaus.org/">STOMP</a> over NIO transport factory
+ * 
+ * @version $Revision: 645574 $
+ */
+public class StompNIOTransportFactory extends NIOTransportFactory implements BrokerServiceAware
{
+
+    private ApplicationContext applicationContext = null;
+
+    protected String getDefaultWireFormatType() {
+        return "stomp";
+    }
+
+    protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory
serverSocketFactory) throws IOException, URISyntaxException {
+        return new TcpTransportServer(this, location, serverSocketFactory) {
+            protected Transport createTransport(Socket socket, WireFormat format) throws
IOException {
+                return new StompNIOTransport(format, socket);
+            }
+        };
+    }
+
+    protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory,
URI location, URI localLocation) throws UnknownHostException, IOException {
+        return new StompNIOTransport(wf, socketFactory, location, localLocation);
+    }  
+
+    public Transport compositeConfigure(Transport transport, WireFormat format, Map options)
{
+        transport = new StompTransportFilter(transport, new LegacyFrameTranslator(), applicationContext);
+        IntrospectionSupport.setProperties(transport, options);
+        return super.compositeConfigure(transport, format, options);
+    }
+
+    protected boolean isUseInactivityMonitor(Transport transport) {
+        // lets disable the inactivity monitor as stomp does not use keep alive
+        // packets
+        return false;
+    }
+    
+    public void setBrokerService(BrokerService brokerService) {
+        if (brokerService instanceof XBeanBrokerService) {
+            this.applicationContext = ((XBeanBrokerService)brokerService).getApplicationContext();
+        }
+    }
+
+}
+

Added: activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp+nio
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp%2Bnio?rev=813703&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp+nio
(added)
+++ activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp+nio
Fri Sep 11 07:57:05 2009
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.transport.stomp.StompNIOTransportFactory

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompNIOTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompNIOTest.java?rev=813703&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompNIOTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompNIOTest.java
Fri Sep 11 07:57:05 2009
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+
+import javax.net.SocketFactory;
+import javax.net.ssl.SSLSocketFactory;
+
+/**
+ * @version $Revision: 732672 $
+ */
+public class StompNIOTest extends StompTest {
+
+    protected void setUp() throws Exception {
+        bindAddress = "stomp+nio://localhost:61612";
+        confUri = "xbean:org/apache/activemq/transport/stomp/niostomp-auth-broker.xml"; 

+        super.setUp();
+    }
+}

Added: activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/transport/stomp/niostomp-auth-broker.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/transport/stomp/niostomp-auth-broker.xml?rev=813703&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/transport/stomp/niostomp-auth-broker.xml
(added)
+++ activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/transport/stomp/niostomp-auth-broker.xml
Fri Sep 11 07:57:05 2009
@@ -0,0 +1,67 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!-- this file can only be parsed using the xbean-spring library -->
+<!-- START SNIPPET: example -->
+<beans>
+  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+  
+  <bean class="org.apache.activemq.util.XStreamFactoryBean" name="xstream">
+        <property name="annotatedClass"><value>org.apache.activemq.transport.stomp.SamplePojo</value></property>
+  </bean>
+
+  <broker useJmx="true" persistent="false" xmlns="http://activemq.org/config/1.0" populateJMSXUserID="true">
+
+    <transportConnectors>
+      <transportConnector name="stomp+nio"   uri="stomp+nio://localhost:61612"/>
+    </transportConnectors>
+
+    <plugins>
+		<simpleAuthenticationPlugin>
+			<users>
+				<authenticationUser username="system" password="manager"
+					groups="users,admins"/>
+				<authenticationUser username="user" password="password"
+					groups="users"/>
+				<authenticationUser username="guest" password="password" groups="guests"/>
+			</users>
+		</simpleAuthenticationPlugin>
+
+
+      <!--  lets configure a destination based authorization mechanism -->
+      <authorizationPlugin>
+        <map>
+          <authorizationMap>
+            <authorizationEntries>
+              <authorizationEntry queue=">" read="admins" write="admins" admin="admins"
/>
+              <authorizationEntry queue="USERS.>" read="users" write="users" admin="users"
/>
+              <authorizationEntry queue="GUEST.>" read="guests" write="guests,users"
admin="guests,users" />
+              
+              <authorizationEntry topic=">" read="admins" write="admins" admin="admins"
/>
+              <authorizationEntry topic="USERS.>" read="users" write="users" admin="users"
/>
+              <authorizationEntry topic="GUEST.>" read="guests" write="guests,users"
admin="guests,users" />
+              
+              <authorizationEntry topic="ActiveMQ.Advisory.>" read="guests,users" write="guests,users"
admin="guests,users"/>
+            </authorizationEntries>
+          </authorizationMap>
+        </map>
+      </authorizationPlugin>
+    </plugins>
+  </broker>
+
+</beans>
\ No newline at end of file



Mime
View raw message