activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r383893 - /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/
Date Tue, 07 Mar 2006 14:07:24 GMT
Author: jstrachan
Date: Tue Mar  7 06:06:59 2006
New Revision: 383893

URL: http://svn.apache.org/viewcvs?rev=383893&view=rev
Log:
initial spike of a UDP based transport - completely untested so far :)

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/ByteBufferPool.java
  (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
  (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java
  (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeader.java
  (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeaderMarshaller.java
  (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
  (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
  (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/package.html
  (with props)

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/ByteBufferPool.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/ByteBufferPool.java?rev=383893&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/ByteBufferPool.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/ByteBufferPool.java
Tue Mar  7 06:06:59 2006
@@ -0,0 +1,47 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.udp;
+
+import org.apache.activemq.Service;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Represents a pool of {@link ByteBuffer} instances. 
+ * This strategy could just create new buffers for each call or
+ * it could pool them.
+ * 
+ * @version $Revision$
+ */
+public interface ByteBufferPool extends Service {
+
+    /**
+     * Extract a buffer from the pool.
+     */
+    ByteBuffer borrowBuffer();
+
+    /**
+     * Returns the buffer to the pool or just discards it for a non-pool strategy
+     */
+    void returnBuffer(ByteBuffer buffer);
+
+    /**
+     * Sets the default size of the buffers
+     */
+    void setDefaultSize(int defaultSize);
+
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/ByteBufferPool.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/ByteBufferPool.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/ByteBufferPool.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java?rev=383893&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
Tue Mar  7 06:06:59 2006
@@ -0,0 +1,171 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.udp;
+
+import org.apache.activemq.Service;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.openwire.BooleanStream;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
+import java.nio.channels.Channels;
+
+/**
+ * A strategy for reading datagrams and de-fragmenting them together.
+ * 
+ * @version $Revision$
+ */
+public class CommandChannel implements Service {
+
+    private static final Log log = LogFactory.getLog(CommandChannel.class);
+    
+    private ByteChannel channel;
+    private OpenWireFormat wireFormat;
+    private ByteBufferPool bufferPool;
+    private int datagramSize = 4 * 1024;
+    private DatagramHeaderMarshaller headerMarshaller = new DatagramHeaderMarshaller();
+    
+    // reading
+    private ByteBuffer readBuffer;
+    private DataInputStream dataIn;
+    private CommandReadBuffer readStack;
+    
+    // writing
+    private ByteBuffer writeBuffer;
+    private BooleanStream bs = new BooleanStream(); 
+    private DataOutputStream dataOut;
+    private int largeMessageBufferSize = 128 * 1024;
+    private DatagramHeader header = new DatagramHeader();
+
+
+    public CommandChannel(ByteChannel channel, OpenWireFormat wireFormat, ByteBufferPool
bufferPool, int datagramSize) {
+        this.channel = channel;
+        this.wireFormat = wireFormat;
+        this.bufferPool = bufferPool;
+        this.datagramSize = datagramSize;
+    }
+
+    public void start() throws Exception {
+        readStack = new CommandReadBuffer(wireFormat);
+        bufferPool.setDefaultSize(datagramSize);
+        bufferPool.start();
+        readBuffer = bufferPool.borrowBuffer();
+        writeBuffer = bufferPool.borrowBuffer();
+        dataIn = new DataInputStream(Channels.newInputStream(channel));
+        dataOut = new DataOutputStream(Channels.newOutputStream(channel));
+    }
+
+    public void stop() throws Exception {
+        bufferPool.stop();
+    }
+    
+    public synchronized Command read() throws IOException {
+        readBuffer.clear();
+        int read = channel.read(readBuffer);
+        DatagramHeader header = headerMarshaller.readHeader(readBuffer);
+
+        int remaining = readBuffer.remaining();
+        int size = header.getDataSize();
+        if (size > remaining) {
+            throw new IOException("Invalid command size: " + size + " when there are only:
" + remaining + " byte(s) remaining");
+        }
+        else if (size < remaining) {
+            log.warn("Extra bytes in buffer. Expecting: " + size + " but has: " + remaining);
+        }
+        if (header.isPartial()) {
+            byte[] data = new byte[size];
+            readBuffer.get(data);
+            header.setPartialData(data);
+        }
+        else {
+            Command command = (Command) wireFormat.unmarshal(dataIn);
+            header.setCommand(command);
+        }
+
+        return readStack.read(header);
+    }
+
+    public synchronized void write(Command command) throws IOException {
+        header.incrementCounter();
+        int size = wireFormat.tightMarshalNestedObject1(command, bs);
+        if (size < datagramSize ) {
+            header.setPartial(false);
+            writeBuffer.rewind();
+            wireFormat.marshal(command, dataOut);
+            dataOut.flush();
+            channel.write(writeBuffer);
+        }
+        else {
+            header.setPartial(true);
+            header.setComplete(false);
+            
+            // lets split the command up into chunks
+            ByteArrayOutputStream largeBuffer = new ByteArrayOutputStream(largeMessageBufferSize);
+            wireFormat.marshal(command, new DataOutputStream(largeBuffer));
+            
+            byte[] data = largeBuffer.toByteArray();
+            int offset = 0;
+            boolean lastFragment = false;
+            for (int fragment = 0, length = data.length; !lastFragment; fragment++ ) {
+                // write the header
+                writeBuffer.rewind();
+                int chunkSize = writeBuffer.capacity() - headerMarshaller.getHeaderSize(header);
+                lastFragment = offset + chunkSize >= length;
+                header.setComplete(lastFragment);
+                headerMarshaller.writeHeader(header, writeBuffer);
+
+                // now the data
+                writeBuffer.put(data, offset, chunkSize);
+                offset += chunkSize;
+                channel.write(writeBuffer);
+            }
+        }
+    }
+    
+    // Properties
+    // -------------------------------------------------------------------------
+
+    public int getDatagramSize() {
+        return datagramSize;
+    }
+
+    /**
+     * Sets the default size of a datagram on the network. 
+     */
+    public void setDatagramSize(int datagramSize) {
+        this.datagramSize = datagramSize;
+    }
+
+    public ByteBufferPool getBufferPool() {
+        return bufferPool;
+    }
+
+    /**
+     * Sets the implementation of the byte buffer pool to use
+     */
+    public void setBufferPool(ByteBufferPool bufferPool) {
+        this.bufferPool = bufferPool;
+    }
+
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java?rev=383893&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java
Tue Mar  7 06:06:59 2006
@@ -0,0 +1,81 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.udp;
+
+import org.apache.activemq.command.Command;
+import org.apache.activemq.openwire.OpenWireFormat;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * Buffers up incoming headers to reorder them. This class is only accessed by
+ * one thread at once.
+ * 
+ * @version $Revision$
+ */
+public class CommandReadBuffer {
+
+    private OpenWireFormat wireFormat;
+    private SortedSet headers = new TreeSet();
+    private int expectedCounter;
+    private ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+    public CommandReadBuffer(OpenWireFormat wireFormat) {
+        this.wireFormat = wireFormat;
+    }
+
+    public Command read(DatagramHeader header) throws IOException {
+        if (expectedCounter != header.getCounter()) {
+            // lets add it to the list for later on
+            headers.add(header);
+
+            // lets see if the first item in the set is the next header
+            header = (DatagramHeader) headers.first();
+            if (expectedCounter != header.getCounter()) {
+                return null;
+            }
+        }
+
+        // we've got a valid header so increment counter
+        expectedCounter++;
+
+        Command answer = null;
+        if (!header.isPartial()) {
+            answer = header.getCommand();
+            if (answer == null) {
+                throw new IllegalStateException("The header should have a command!: " + header);
+            }
+        }
+        else {
+            byte[] data = header.getPartialData();
+            out.write(data);
+
+            if (header.isComplete()) {
+                answer = (Command) wireFormat.unmarshal(new DataInputStream(new ByteArrayInputStream(out.toByteArray())));
+                out.reset();
+            }
+        }
+        return answer;
+
+    }
+
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeader.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeader.java?rev=383893&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeader.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeader.java
Tue Mar  7 06:06:59 2006
@@ -0,0 +1,147 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.udp;
+
+import org.apache.activemq.command.Command;
+
+/**
+ * Represents a header used when sending data grams
+ * 
+ * @version $Revision$
+ */
+public class DatagramHeader implements Comparable {
+
+    private String producerId;
+    private long counter;
+    private boolean partial;
+    private boolean complete;
+    private int dataSize;
+
+    // transient caches
+    private transient byte[] partialData;
+    private transient Command command;
+
+    public int hashCode() {
+        final int PRIME = 31;
+        int result = 1;
+        result = PRIME * result + (int) (counter ^ (counter >>> 32));
+        return result;
+    }
+
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        final DatagramHeader other = (DatagramHeader) obj;
+        if (counter != other.counter)
+            return false;
+        return true;
+    }
+
+    public int compareTo(DatagramHeader that) {
+        return (int) (this.counter - that.counter);
+    }
+
+    public int compareTo(Object that) {
+        if (that instanceof DatagramHeader) {
+            return compareTo((DatagramHeader) that);
+        }
+        return getClass().getName().compareTo(that.getClass().getName());
+    }
+
+    public boolean isComplete() {
+        return complete;
+    }
+
+    public void setComplete(boolean complete) {
+        this.complete = complete;
+    }
+
+    public long getCounter() {
+        return counter;
+    }
+
+    public void setCounter(long counter) {
+        this.counter = counter;
+    }
+
+    public boolean isPartial() {
+        return partial;
+    }
+
+    public void setPartial(boolean partial) {
+        this.partial = partial;
+    }
+
+    public String getProducerId() {
+        return producerId;
+    }
+
+    public void setProducerId(String producerId) {
+        this.producerId = producerId;
+    }
+
+    public int getDataSize() {
+        return dataSize;
+    }
+
+    public void setDataSize(int dataSize) {
+        this.dataSize = dataSize;
+    }
+
+    public void incrementCounter() {
+        counter++;
+    }
+
+    public byte getFlags() {
+        byte answer = 0;
+        if (partial) {
+            answer |= 0x1;
+        }
+        if (complete) {
+            answer |= 0x2;
+        }
+        return answer;
+    }
+
+    public void setFlags(byte flags) {
+        partial = (flags & 0x1) == 0;
+        complete = (flags & 0x2) == 0;
+    }
+
+    public Command getCommand() {
+        return command;
+    }
+
+    public void setCommand(Command command) {
+        this.command = command;
+    }
+
+    public byte[] getPartialData() {
+        return partialData;
+    }
+
+    public void setPartialData(byte[] partialData) {
+        this.partialData = partialData;
+    }
+
+    // Transient cached properties
+
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeader.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeader.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeader.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeaderMarshaller.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeaderMarshaller.java?rev=383893&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeaderMarshaller.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeaderMarshaller.java
Tue Mar  7 06:06:59 2006
@@ -0,0 +1,45 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.udp;
+
+
+import java.nio.ByteBuffer;
+
+/**
+ * 
+ * @version $Revision$
+ */
+public class DatagramHeaderMarshaller {
+
+    public DatagramHeader readHeader(ByteBuffer readBuffer) {
+        DatagramHeader answer = new DatagramHeader();
+        answer.setCounter(readBuffer.getLong());
+        byte flags = readBuffer.get();
+        answer.setFlags(flags);
+        return answer;
+    }
+
+    public void writeHeader(DatagramHeader header, ByteBuffer writeBuffer) {
+        writeBuffer.putLong(header.getCounter());
+        writeBuffer.put(header.getFlags());
+    }
+
+    public int getHeaderSize(DatagramHeader header) {
+        return 8 + 1;
+    }
+
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeaderMarshaller.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeaderMarshaller.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeaderMarshaller.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java?rev=383893&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
Tue Mar  7 06:06:59 2006
@@ -0,0 +1,195 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.udp;
+
+import org.apache.activemq.Service;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportThreadSupport;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.nio.channels.DatagramChannel;
+
+/**
+ * An implementation of the {@link Transport} interface using raw UDP
+ * 
+ * @version $Revision$
+ */
+public class UdpTransport extends TransportThreadSupport implements Transport, Service, Runnable
{
+    private static final Log log = LogFactory.getLog(UdpTransport.class);
+
+    private CommandChannel commandChannel;
+    private OpenWireFormat wireFormat;
+    private ByteBufferPool bufferPool;
+    private int datagramSize = 4 * 1024;
+    private long maxInactivityDuration = 0; //30000;
+    private InetSocketAddress socketAddress;
+    private DatagramChannel channel;
+    private boolean trace = false;
+    private boolean useLocalHost = true;
+
+    protected UdpTransport(OpenWireFormat wireFormat) {
+        this.wireFormat = wireFormat;
+    }
+
+    public UdpTransport(OpenWireFormat wireFormat, URI remoteLocation) throws UnknownHostException,
IOException {
+        this(wireFormat);
+        this.socketAddress = createAddress(remoteLocation);
+    }
+
+    public UdpTransport(OpenWireFormat wireFormat, InetSocketAddress socketAddress) {
+        this(wireFormat);
+        this.socketAddress = socketAddress;
+    }
+
+    /**
+     * A one way asynchronous send
+     */
+    public void oneway(Command command) throws IOException {
+        checkStarted(command);
+        commandChannel.write(command);
+    }
+
+    /**
+     * @return pretty print of 'this'
+     */
+    public String toString() {
+        return "udp://" + socketAddress;
+    }
+
+    /**
+     * reads packets from a Socket
+     */
+    public void run() {
+        log.trace("TCP consumer thread starting");
+        while (!isClosed()) {
+            try {
+                Command command = commandChannel.read();
+                doConsume(command);
+            }
+            catch (SocketTimeoutException e) {
+            }
+            catch (InterruptedIOException e) {
+            }
+            catch (IOException e) {
+                try {
+                    stop();
+                }
+                catch (Exception e2) {
+                    log.warn("Caught while closing: " + e2 + ". Now Closed", e2);
+                }
+                onException(e);
+            }
+        }
+    }
+
+    // Properties
+    // -------------------------------------------------------------------------
+    public boolean isTrace() {
+        return trace;
+    }
+
+    public void setTrace(boolean trace) {
+        this.trace = trace;
+    }
+
+    public long getMaxInactivityDuration() {
+        return maxInactivityDuration;
+    }
+
+    /**
+     * Sets the maximum inactivity duration
+     */
+    public void setMaxInactivityDuration(long maxInactivityDuration) {
+        this.maxInactivityDuration = maxInactivityDuration;
+    }
+    public boolean isUseLocalHost() {
+        return useLocalHost;
+    }
+
+    /**
+     * Sets whether 'localhost' or the actual local host name should be used to
+     * make local connections. On some operating systems such as Macs its not
+     * possible to connect as the local host name so localhost is better.
+     */
+    public void setUseLocalHost(boolean useLocalHost) {
+        this.useLocalHost = useLocalHost;
+    }
+
+
+    public CommandChannel getCommandChannel() {
+        return commandChannel;
+    }
+
+    /**
+     * Sets the implementation of the command channel to use.
+     */
+    public void setCommandChannel(CommandChannel commandChannel) {
+        this.commandChannel = commandChannel;
+    }
+    
+    // Implementation methods
+    // -------------------------------------------------------------------------
+
+
+    /**
+     * Creates an address from the given URI
+     */
+    protected InetSocketAddress createAddress(URI remoteLocation) throws UnknownHostException,
IOException {
+        String host = resolveHostName(remoteLocation.getHost());
+        return new InetSocketAddress(host, remoteLocation.getPort());
+    }
+
+    protected String resolveHostName(String host) throws UnknownHostException {
+        String localName = InetAddress.getLocalHost().getHostName();
+        if (localName != null && isUseLocalHost()) {
+            if (localName.equals(host)) {
+                return "localhost";
+            }
+        }
+        return host;
+    }
+
+    protected void doStart() throws Exception {
+        if (socketAddress != null) {
+            channel = DatagramChannel.open();
+            channel.connect(socketAddress);
+        }
+        else if (channel == null) {
+            throw new IllegalArgumentException("No channel configured");
+        }
+        commandChannel = new CommandChannel(channel, wireFormat, bufferPool, datagramSize);
+        commandChannel.start();
+        super.doStart();
+    }
+
+    protected void doStop(ServiceStopper stopper) throws Exception {
+        if (channel != null) {
+            channel.close();
+        }
+    }
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
------------------------------------------------------------------------------
    svn:executable = *

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java?rev=383893&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
Tue Mar  7 06:06:59 2006
@@ -0,0 +1,124 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.udp;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+import javax.net.ServerSocketFactory;
+import javax.net.SocketFactory;
+import org.activeio.command.WireFormat;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.transport.InactivityMonitor;
+import org.apache.activemq.transport.MutexTransport;
+import org.apache.activemq.transport.ResponseCorrelator;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportLogger;
+import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.transport.WireFormatNegotiator;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.URISupport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class UdpTransportFactory extends TransportFactory {
+    private static final Log log = LogFactory.getLog(UdpTransportFactory.class);
+
+    public TransportServer doBind(String brokerId, final URI location) throws IOException
{
+        throw new IOException("TransportServer not supported for UDP");
+        /*
+        try {
+            Map options = new HashMap(URISupport.parseParamters(location));
+
+            return null;
+            UdpTransportServer server = new UdpTransportServer(location);
+            server.setWireFormatFactory(createWireFormatFactory(options));
+            IntrospectionSupport.setProperties(server, options);
+
+            return server;
+        }
+        catch (URISyntaxException e) {
+            throw IOExceptionSupport.create(e);
+        }
+        */
+    }
+
+    public Transport configure(Transport transport, WireFormat format, Map options) {
+        IntrospectionSupport.setProperties(transport, options);
+        UdpTransport tcpTransport = (UdpTransport) transport;
+        if (tcpTransport.isTrace()) {
+            transport = new TransportLogger(transport);
+        }
+
+        if (tcpTransport.getMaxInactivityDuration() > 0) {
+            transport = new InactivityMonitor(transport, tcpTransport.getMaxInactivityDuration());
+        }
+
+        transport = new MutexTransport(transport);
+        transport = new ResponseCorrelator(transport);
+        return transport;
+    }
+
+    public Transport compositeConfigure(Transport transport, WireFormat format, Map options)
{
+        IntrospectionSupport.setProperties(transport, options);
+        UdpTransport tcpTransport = (UdpTransport) transport;
+        if (tcpTransport.isTrace()) {
+            transport = new TransportLogger(transport);
+        }
+
+        if (tcpTransport.getMaxInactivityDuration() > 0) {
+            transport = new InactivityMonitor(transport, tcpTransport.getMaxInactivityDuration());
+        }
+        return transport;
+    }
+
+    protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException,
IOException {
+        /*
+        URI localLocation = null;
+        String path = location.getPath();
+        // see if the path is a local URI location
+        if (path != null && path.length() > 0) {
+            int localPortIndex = path.indexOf(':');
+            try {
+                Integer.parseInt(path.substring((localPortIndex + 1), path.length()));
+                String localString = location.getScheme() + ":/" + path;
+                localLocation = new URI(localString);
+            }
+            catch (Exception e) {
+                log.warn("path isn't a valid local location for TcpTransport to use", e);
+            }
+        }
+        if (localLocation != null) {
+            return new UdpTransport(wf, location, localLocation);
+        }
+        */
+        return new UdpTransport((OpenWireFormat) wf, location);
+    }
+
+    protected ServerSocketFactory createServerSocketFactory() {
+        return ServerSocketFactory.getDefault();
+    }
+
+    protected SocketFactory createSocketFactory() {
+        return SocketFactory.getDefault();
+    }
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
------------------------------------------------------------------------------
    svn:executable = *

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/package.html
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/package.html?rev=383893&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/package.html
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/package.html
Tue Mar  7 06:06:59 2006
@@ -0,0 +1,9 @@
+<html>
+<head>
+</head>
+<body>
+
+UDP based Transport implementation.
+
+</body>
+</html>

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/package.html
------------------------------------------------------------------------------
    svn:executable = *

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/package.html
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/package.html
------------------------------------------------------------------------------
    svn:mime-type = text/html



Mime
View raw message