cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dk...@apache.org
Subject svn commit: r1373849 - in /cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp: IoSessionInputStream.java IoSessionOutputStream.java UDPConduit.java UDPDestination.java
Date Thu, 16 Aug 2012 14:14:09 GMT
Author: dkulp
Date: Thu Aug 16 14:14:09 2012
New Revision: 1373849

URL: http://svn.apache.org/viewvc?rev=1373849&view=rev
Log:
Update UDP transport ot properly mark end of streams so it works with the Logging interceptors

Added:
    cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/IoSessionOutputStream.java
Modified:
    cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/IoSessionInputStream.java
    cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
    cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java

Modified: cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/IoSessionInputStream.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/IoSessionInputStream.java?rev=1373849&r1=1373848&r2=1373849&view=diff
==============================================================================
--- cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/IoSessionInputStream.java
(original)
+++ cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/IoSessionInputStream.java
Thu Aug 16 14:14:09 2012
@@ -25,143 +25,86 @@ import java.io.InputStream;
 import org.apache.mina.core.buffer.IoBuffer;
 
 
-// Copies almost ver-batim from Mina due to the version in Mina not being public
 public class IoSessionInputStream extends InputStream {
-    private final Object mutex = new Object();
-    private final IoBuffer buf;
-    private volatile boolean closed;
-    private volatile boolean released;
-    private IOException exception;
+    private volatile IoBuffer buf;
+    private volatile IOException exception;
 
+    public IoSessionInputStream(IoBuffer b) {
+        buf = IoBuffer.allocate(b.limit());
+        buf.put(b);
+        buf.flip();
+    }
     public IoSessionInputStream() {
-        buf = IoBuffer.allocate(2048);
-        buf.setAutoExpand(true);
-        buf.limit(0);
+        buf = null;
     }
 
     @Override
-    public int available() {
-        if (released) {
-            return 0;
+    public int available() throws IOException {
+        if (exception != null) {
+            throw exception;
         }
-
-        synchronized (mutex) {
-            return buf.remaining();
+        if (buf == null) {
+            return 0;
         }
+        return buf.remaining();
     }
 
     @Override
-    public void close() {
-        if (closed) {
-            return;
-        }
-
-        synchronized (mutex) {
-            closed = true;
-            releaseBuffer();
-
-            mutex.notifyAll();
+    public void close() throws IOException {
+        if (exception != null) {
+            throw exception;
         }
     }
 
     @Override
     public int read() throws IOException {
-        synchronized (mutex) {
-            if (!waitForData()) {
-                return -1;
-            }
-
-            return buf.get() & 0xff;
+        waitForData();
+        if (exception != null) {
+            throw exception;
         }
+        return buf.get() & 0xff;
     }
 
-    @Override
-    public int read(byte[] b, int off, int len) throws IOException {
-        synchronized (mutex) {
-            if (!waitForData()) {
-                return -1;
-            }
-
-            int readBytes;
-
-            if (len > buf.remaining()) {
-                readBytes = buf.remaining();
-            } else {
-                readBytes = len;
+    public synchronized void waitForData() throws IOException {
+        if (exception == null && buf == null) {
+            try {
+                wait();
+            } catch (InterruptedException e) {
+                throw new IOException();
             }
-
-            buf.get(b, off, readBytes);
-
-            return readBytes;
         }
     }
-
-    private boolean waitForData() throws IOException {
-        if (released) {
-            return false;
-        }
-
-        synchronized (mutex) {
-            while (!released && buf.remaining() == 0 && exception == null)
{
-                try {
-                    mutex.wait();
-                } catch (InterruptedException e) {
-                    IOException ioe = new IOException(
-                            "Interrupted while waiting for more data");
-                    ioe.initCause(e);
-                    throw ioe;
-                }
-            }
-        }
-
+    
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        waitForData();
         if (exception != null) {
-            releaseBuffer();
             throw exception;
         }
-
-        if (closed && buf.remaining() == 0) {
-            releaseBuffer();
-
-            return false;
+        if (buf.remaining() == 0) {
+            return -1;
         }
-
-        return true;
-    }
-
-    private void releaseBuffer() {
-        if (released) {
-            return;
+        int readBytes;
+        if (len > buf.remaining()) {
+            readBytes = buf.remaining();
+        } else {
+            readBytes = len;
         }
-
-        released = true;
+        buf.get(b, off, readBytes);
+        return readBytes;
     }
 
-    public void write(IoBuffer src) {
-        synchronized (mutex) {
-            if (closed) {
-                return;
-            }
-
-            if (buf.hasRemaining()) {
-                this.buf.compact();
-                this.buf.put(src);
-                this.buf.flip();
-            } else {
-                this.buf.clear();
-                this.buf.put(src);
-                this.buf.flip();
-                mutex.notifyAll();
-            }
+    public synchronized void throwException(IOException e) {
+        if (exception == null) {
+            exception = e;
         }
+        notifyAll();
     }
 
-    public void throwException(IOException e) {
-        synchronized (mutex) {
-            if (exception == null) {
-                exception = e;
-
-                mutex.notifyAll();
-            }
-        }
+    public synchronized void setBuffer(IoBuffer b) {
+        buf = IoBuffer.allocate(b.limit());
+        buf.put(b);
+        buf.flip();
+        notifyAll();
     }
 }
\ No newline at end of file

Added: cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/IoSessionOutputStream.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/IoSessionOutputStream.java?rev=1373849&view=auto
==============================================================================
--- cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/IoSessionOutputStream.java
(added)
+++ cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/IoSessionOutputStream.java
Thu Aug 16 14:14:09 2012
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.udp;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.mina.core.future.WriteFuture;
+import org.apache.mina.core.session.IoSession;
+
+class IoSessionOutputStream extends OutputStream {
+    private final IoSession session;
+
+    private WriteFuture lastWriteFuture;
+
+    public IoSessionOutputStream(IoSession session) {
+        this.session = session;
+    }
+
+    @Override
+    public void close() throws IOException {
+        try {
+            flush();
+        } finally {
+            session.close(true).awaitUninterruptibly();
+        }
+    }
+
+    private void checkClosed() throws IOException {
+        if (!session.isConnected()) {
+            throw new IOException("The session has been closed.");
+        }
+    }
+
+    private synchronized void write(IoBuffer buf) throws IOException {
+        checkClosed();
+        WriteFuture future = session.write(buf);
+        lastWriteFuture = future;
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        write(IoBuffer.wrap(b.clone(), off, len));
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        IoBuffer buf = IoBuffer.allocate(1);
+        buf.put((byte) b);
+        buf.flip();
+        write(buf);
+    }
+
+    @Override
+    public synchronized void flush() throws IOException {
+        if (lastWriteFuture == null) {
+            return;
+        }
+
+        lastWriteFuture.awaitUninterruptibly();
+        if (!lastWriteFuture.isWritten()) {
+            throw new IOException(
+                    "The bytes could not be written to the session");
+        }
+    }
+}

Modified: cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java?rev=1373849&r1=1373848&r2=1373849&view=diff
==============================================================================
--- cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
(original)
+++ cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
Thu Aug 16 14:14:09 2012
@@ -85,8 +85,7 @@ public class UDPConduit extends Abstract
             inMessage.setExchange(message.getExchange());
             message.getExchange().setInMessage(inMessage);
             
-            IoSessionInputStream ins = new IoSessionInputStream();
-            ins.write((IoBuffer)buf);
+            IoSessionInputStream ins = new IoSessionInputStream(buf);
             inMessage.setContent(InputStream.class, ins);
             inMessage.put(IoSessionInputStream.class, ins);
             
@@ -107,7 +106,7 @@ public class UDPConduit extends Abstract
             
         } else {
             IoSessionInputStream ins = message.getExchange().getInMessage().get(IoSessionInputStream.class);
-            ins.write((IoBuffer)buf);
+            ins.setBuffer((IoBuffer)buf);
         }
     }
     

Modified: cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java?rev=1373849&r1=1373848&r2=1373849&view=diff
==============================================================================
--- cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java
(original)
+++ cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java
Thu Aug 16 14:14:09 2012
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
 import java.net.URI;
 import java.util.logging.Logger;
 
@@ -41,6 +42,8 @@ import org.apache.cxf.workqueue.WorkQueu
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 import org.apache.mina.core.buffer.IoBuffer;
 import org.apache.mina.core.service.IoHandler;
+import org.apache.mina.core.session.AttributeKey;
+import org.apache.mina.core.session.IdleStatus;
 import org.apache.mina.core.session.IoSession;
 import org.apache.mina.handler.stream.StreamIoHandler;
 import org.apache.mina.transport.socket.DatagramSessionConfig;
@@ -51,6 +54,8 @@ import org.apache.mina.transport.socket.
  */
 public class UDPDestination extends AbstractDestination {
     private static final Logger LOG = LogUtils.getL7dLogger(UDPDestination.class); 
+    private static final AttributeKey KEY_IN = new AttributeKey(StreamIoHandler.class, "in");
+    private static final AttributeKey KEY_OUT = new AttributeKey(StreamIoHandler.class, "out");
     
     NioDatagramAcceptor acceptor;
     AutomaticWorkQueue queue;
@@ -134,11 +139,27 @@ public class UDPDestination extends Abst
     
     
     class UDPIOHandler extends StreamIoHandler implements IoHandler {
-
+        
+        
+        @Override
+        public void sessionOpened(IoSession session) {
+            // Set timeouts
+            session.getConfig().setWriteTimeout(getWriteTimeout());
+            session.getConfig().setIdleTime(IdleStatus.READER_IDLE, getReadTimeout());
+
+            // Create streams
+            InputStream in = new IoSessionInputStream();
+            OutputStream out = new IoSessionOutputStream(session);
+            session.setAttribute(KEY_IN, in);
+            session.setAttribute(KEY_OUT, out);
+            processStreamIo(session, in, out);
+        }
+        
         protected void processStreamIo(IoSession session, InputStream in, OutputStream out)
{
             final MessageImpl m = new MessageImpl();
             final Exchange exchange = new ExchangeImpl();
             exchange.setDestination(UDPDestination.this);
+            m.setDestination(UDPDestination.this);
             exchange.setInMessage(m);
             m.setContent(InputStream.class, in);
             out = new UDPDestinationOutputStream(out);
@@ -150,6 +171,52 @@ public class UDPDestination extends Abst
             });
         }
         
+        public void sessionClosed(IoSession session) throws Exception {
+            final InputStream in = (InputStream) session.getAttribute(KEY_IN);
+            final OutputStream out = (OutputStream) session.getAttribute(KEY_OUT);
+            try {
+                in.close();
+            } finally {
+                out.close();
+            }
+        }
+
+        public void messageReceived(IoSession session, Object buf) {
+            final IoSessionInputStream in = (IoSessionInputStream) session
+                    .getAttribute(KEY_IN);
+            in.setBuffer((IoBuffer) buf);
+        }
+
+        public void exceptionCaught(IoSession session, Throwable cause) {
+            final IoSessionInputStream in = (IoSessionInputStream) session
+                    .getAttribute(KEY_IN);
+
+            IOException e = null;
+            if (cause instanceof StreamIoException) {
+                e = (IOException) cause.getCause();
+            } else if (cause instanceof IOException) {
+                e = (IOException) cause;
+            }
+
+            if (e != null && in != null) {
+                in.throwException(e);
+            } else {
+                session.close(true);
+            }
+        }
+        public void sessionIdle(IoSession session, IdleStatus status) {
+            if (status == IdleStatus.READER_IDLE) {
+                throw new StreamIoException(new SocketTimeoutException(
+                        "Read timeout"));
+            }
+        }
+    }
+    private static class StreamIoException extends RuntimeException {
+        private static final long serialVersionUID = 3976736960742503222L;
+
+        public StreamIoException(IOException cause) {
+            super(cause);
+        }
     }
     
     public class UDPDestinationOutputStream extends OutputStream {



Mime
View raw message