activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r379619 [4/30] - in /incubator/activemq/trunk: ./ activecluster/ activecluster/src/java/org/apache/activecluster/ activecluster/src/java/org/apache/activecluster/election/ activecluster/src/java/org/apache/activecluster/election/impl/ activ...
Date Tue, 21 Feb 2006 23:14:17 GMT
Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/adapter/SyncChannelToSocket.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/adapter/SyncChannelToSocket.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/adapter/SyncChannelToSocket.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/adapter/SyncChannelToSocket.java Tue Feb 21 15:12:56 2006
@@ -1,221 +1,221 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.adapter;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.InetAddress;
-import java.net.Socket;
-import java.net.SocketAddress;
-import java.net.SocketException;
-import java.nio.channels.SocketChannel;
-
-import org.apache.activeio.packet.ByteArrayPacket;
-import org.apache.activeio.packet.Packet;
-import org.apache.activeio.packet.sync.SyncChannel;
-import org.apache.activeio.stream.sync.socket.SocketMetadata;
-
-/**
- * Provides a {@see java.net.Socket} interface to a {@see org.apache.activeio.SynchChannel}.
- * 
- * If the {@see org.apache.activeio.SynchChannel} being adapted can not be 
- * {@see org.apache.activeio.Channel#narrow(Class)}ed to a {@see org.apache.activeio.net.SocketMetadata} 
- * then all methods accessing socket metadata will throw a {@see java.net.SocketException}.
- *  
- */
-public class SyncChannelToSocket extends Socket {
-    
-    private final SyncChannel channel;
-    private final SyncChannelToInputStream inputStream;
-    private final SyncChannelToOutputStream outputStream;
-    private final SocketMetadata socketMetadata;
-    private final Packet urgentPackget = new ByteArrayPacket(new byte[1]);
-    boolean closed;
-
-    public SyncChannelToSocket(SyncChannel channel) {
-        this(channel, (SocketMetadata)channel.getAdapter(SocketMetadata.class));
-    }
-    
-    public SyncChannelToSocket(SyncChannel channel, SocketMetadata socketMetadata) {
-        this.channel = channel;
-        this.socketMetadata = socketMetadata;
-        this.inputStream = new SyncChannelToInputStream(channel);
-        this.outputStream = new SyncChannelToOutputStream(channel);
-    }
-
-    public boolean isConnected() {
-        return true;
-    }
-    
-    public boolean isBound() {
-        return true;
-    }
-    
-    public boolean isClosed() {
-        return closed;
-    }
-
-    public void bind(SocketAddress bindpoint) throws IOException {
-        throw new IOException("Not supported");
-    }
-
-    public synchronized void close() throws IOException {
-        if( closed )
-            return;
-        closed = true;
-        inputStream.close();
-        outputStream.close();
-        channel.stop();
-    }
-    
-    public void connect(SocketAddress endpoint) throws IOException {
-        throw new IOException("Not supported");
-    }
-    
-    public void connect(SocketAddress endpoint, int timeout) throws IOException {
-        throw new IOException("Not supported");
-    }
-
-    public SocketChannel getChannel() {
-        return null;
-    }
-    
-    public InputStream getInputStream() throws IOException {
-        return inputStream;
-    }
-
-    public OutputStream getOutputStream() throws IOException {
-        return outputStream;
-    }
-    
-    public boolean isInputShutdown() {
-        return inputStream.isClosed();
-    }
-
-    public boolean isOutputShutdown() {
-        return outputStream.isClosed();
-    }
-    
-    public void sendUrgentData(int data) throws IOException {
-        urgentPackget.clear();
-        urgentPackget.write(data);
-        urgentPackget.flip();
-        channel.write(urgentPackget);
-    }
-
-    public int getSoTimeout() throws SocketException {
-        return (int) inputStream.getTimeout();
-    }
-    
-    public synchronized void setSoTimeout(int timeout) throws SocketException {
-        inputStream.setTimeout(timeout);
-    }    
-    
-    public void shutdownOutput() throws IOException {
-        outputStream.close();
-    }
-    
-    public void shutdownInput() throws IOException {
-        inputStream.close();
-    }
-    
-    protected SocketMetadata getSocketMetadata() throws SocketException {
-        if( socketMetadata == null )
-            throw new SocketException("No socket metadata available.");
-        return socketMetadata;
-    }
-    
-    public InetAddress getInetAddress() {
-        if( socketMetadata ==null )
-            return null;
-        return socketMetadata.getInetAddress();
-    }
-    public boolean getKeepAlive() throws SocketException {
-        return getSocketMetadata().getKeepAlive();
-    }
-    public InetAddress getLocalAddress() {
-        if( socketMetadata ==null )
-            return null;
-        return socketMetadata.getLocalAddress();
-    }
-    public int getLocalPort() {
-        if( socketMetadata ==null )
-            return -1;
-        return socketMetadata.getLocalPort();
-    }
-    public SocketAddress getLocalSocketAddress() {
-        if( socketMetadata ==null )
-            return null;
-        return socketMetadata.getLocalSocketAddress();
-    }
-    public boolean getOOBInline() throws SocketException {
-        return getSocketMetadata().getOOBInline();
-    }
-    public int getPort() {
-        if( socketMetadata ==null )
-            return -1;
-        return socketMetadata.getPort();
-    }
-    public int getReceiveBufferSize() throws SocketException {
-        return getSocketMetadata().getReceiveBufferSize();
-    }
-    public SocketAddress getRemoteSocketAddress() {
-        if( socketMetadata ==null )
-            return null;
-        return socketMetadata.getRemoteSocketAddress();
-    }
-    public boolean getReuseAddress() throws SocketException {
-        return getSocketMetadata().getReuseAddress();
-    }
-    public int getSendBufferSize() throws SocketException {
-        return getSocketMetadata().getSendBufferSize();
-    }
-    public int getSoLinger() throws SocketException {
-        return getSocketMetadata().getSoLinger();
-    }
-    public boolean getTcpNoDelay() throws SocketException {
-        return getSocketMetadata().getTcpNoDelay();
-    }
-    public int getTrafficClass() throws SocketException {
-        return getSocketMetadata().getTrafficClass();
-    }
-    public void setKeepAlive(boolean on) throws SocketException {
-        getSocketMetadata().setKeepAlive(on);
-    }
-    public void setOOBInline(boolean on) throws SocketException {
-        getSocketMetadata().setOOBInline(on);
-    }
-    public void setReceiveBufferSize(int size) throws SocketException {
-        getSocketMetadata().setReceiveBufferSize(size);
-    }
-    public void setReuseAddress(boolean on) throws SocketException {
-        getSocketMetadata().setReuseAddress(on);
-    }
-    public void setSendBufferSize(int size) throws SocketException {
-        getSocketMetadata().setSendBufferSize(size);
-    }
-    public void setSoLinger(boolean on, int linger) throws SocketException {
-        getSocketMetadata().setSoLinger(on, linger);
-    }
-    public void setTcpNoDelay(boolean on) throws SocketException {
-        getSocketMetadata().setTcpNoDelay(on);
-    }
-    public void setTrafficClass(int tc) throws SocketException {
-        getSocketMetadata().setTrafficClass(tc);
-    }
-}
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.adapter;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.nio.channels.SocketChannel;
+
+import org.apache.activeio.packet.ByteArrayPacket;
+import org.apache.activeio.packet.Packet;
+import org.apache.activeio.packet.sync.SyncChannel;
+import org.apache.activeio.stream.sync.socket.SocketMetadata;
+
+/**
+ * Provides a {@see java.net.Socket} interface to a {@see org.apache.activeio.SynchChannel}.
+ * 
+ * If the {@see org.apache.activeio.SynchChannel} being adapted can not be 
+ * {@see org.apache.activeio.Channel#narrow(Class)}ed to a {@see org.apache.activeio.net.SocketMetadata} 
+ * then all methods accessing socket metadata will throw a {@see java.net.SocketException}.
+ *  
+ */
+public class SyncChannelToSocket extends Socket {
+    
+    private final SyncChannel channel;
+    private final SyncChannelToInputStream inputStream;
+    private final SyncChannelToOutputStream outputStream;
+    private final SocketMetadata socketMetadata;
+    private final Packet urgentPackget = new ByteArrayPacket(new byte[1]);
+    boolean closed;
+
+    public SyncChannelToSocket(SyncChannel channel) {
+        this(channel, (SocketMetadata)channel.getAdapter(SocketMetadata.class));
+    }
+    
+    public SyncChannelToSocket(SyncChannel channel, SocketMetadata socketMetadata) {
+        this.channel = channel;
+        this.socketMetadata = socketMetadata;
+        this.inputStream = new SyncChannelToInputStream(channel);
+        this.outputStream = new SyncChannelToOutputStream(channel);
+    }
+
+    public boolean isConnected() {
+        return true;
+    }
+    
+    public boolean isBound() {
+        return true;
+    }
+    
+    public boolean isClosed() {
+        return closed;
+    }
+
+    public void bind(SocketAddress bindpoint) throws IOException {
+        throw new IOException("Not supported");
+    }
+
+    public synchronized void close() throws IOException {
+        if( closed )
+            return;
+        closed = true;
+        inputStream.close();
+        outputStream.close();
+        channel.stop();
+    }
+    
+    public void connect(SocketAddress endpoint) throws IOException {
+        throw new IOException("Not supported");
+    }
+    
+    public void connect(SocketAddress endpoint, int timeout) throws IOException {
+        throw new IOException("Not supported");
+    }
+
+    public SocketChannel getChannel() {
+        return null;
+    }
+    
+    public InputStream getInputStream() throws IOException {
+        return inputStream;
+    }
+
+    public OutputStream getOutputStream() throws IOException {
+        return outputStream;
+    }
+    
+    public boolean isInputShutdown() {
+        return inputStream.isClosed();
+    }
+
+    public boolean isOutputShutdown() {
+        return outputStream.isClosed();
+    }
+    
+    public void sendUrgentData(int data) throws IOException {
+        urgentPackget.clear();
+        urgentPackget.write(data);
+        urgentPackget.flip();
+        channel.write(urgentPackget);
+    }
+
+    public int getSoTimeout() throws SocketException {
+        return (int) inputStream.getTimeout();
+    }
+    
+    public synchronized void setSoTimeout(int timeout) throws SocketException {
+        inputStream.setTimeout(timeout);
+    }    
+    
+    public void shutdownOutput() throws IOException {
+        outputStream.close();
+    }
+    
+    public void shutdownInput() throws IOException {
+        inputStream.close();
+    }
+    
+    protected SocketMetadata getSocketMetadata() throws SocketException {
+        if( socketMetadata == null )
+            throw new SocketException("No socket metadata available.");
+        return socketMetadata;
+    }
+    
+    public InetAddress getInetAddress() {
+        if( socketMetadata ==null )
+            return null;
+        return socketMetadata.getInetAddress();
+    }
+    public boolean getKeepAlive() throws SocketException {
+        return getSocketMetadata().getKeepAlive();
+    }
+    public InetAddress getLocalAddress() {
+        if( socketMetadata ==null )
+            return null;
+        return socketMetadata.getLocalAddress();
+    }
+    public int getLocalPort() {
+        if( socketMetadata ==null )
+            return -1;
+        return socketMetadata.getLocalPort();
+    }
+    public SocketAddress getLocalSocketAddress() {
+        if( socketMetadata ==null )
+            return null;
+        return socketMetadata.getLocalSocketAddress();
+    }
+    public boolean getOOBInline() throws SocketException {
+        return getSocketMetadata().getOOBInline();
+    }
+    public int getPort() {
+        if( socketMetadata ==null )
+            return -1;
+        return socketMetadata.getPort();
+    }
+    public int getReceiveBufferSize() throws SocketException {
+        return getSocketMetadata().getReceiveBufferSize();
+    }
+    public SocketAddress getRemoteSocketAddress() {
+        if( socketMetadata ==null )
+            return null;
+        return socketMetadata.getRemoteSocketAddress();
+    }
+    public boolean getReuseAddress() throws SocketException {
+        return getSocketMetadata().getReuseAddress();
+    }
+    public int getSendBufferSize() throws SocketException {
+        return getSocketMetadata().getSendBufferSize();
+    }
+    public int getSoLinger() throws SocketException {
+        return getSocketMetadata().getSoLinger();
+    }
+    public boolean getTcpNoDelay() throws SocketException {
+        return getSocketMetadata().getTcpNoDelay();
+    }
+    public int getTrafficClass() throws SocketException {
+        return getSocketMetadata().getTrafficClass();
+    }
+    public void setKeepAlive(boolean on) throws SocketException {
+        getSocketMetadata().setKeepAlive(on);
+    }
+    public void setOOBInline(boolean on) throws SocketException {
+        getSocketMetadata().setOOBInline(on);
+    }
+    public void setReceiveBufferSize(int size) throws SocketException {
+        getSocketMetadata().setReceiveBufferSize(size);
+    }
+    public void setReuseAddress(boolean on) throws SocketException {
+        getSocketMetadata().setReuseAddress(on);
+    }
+    public void setSendBufferSize(int size) throws SocketException {
+        getSocketMetadata().setSendBufferSize(size);
+    }
+    public void setSoLinger(boolean on, int linger) throws SocketException {
+        getSocketMetadata().setSoLinger(on, linger);
+    }
+    public void setTcpNoDelay(boolean on) throws SocketException {
+        getSocketMetadata().setTcpNoDelay(on);
+    }
+    public void setTrafficClass(int tc) throws SocketException {
+        getSocketMetadata().setTrafficClass(tc);
+    }
+}

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/adapter/SyncChannelToSocket.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/adapter/SyncToAsyncChannel.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/adapter/SyncToAsyncChannel.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/adapter/SyncToAsyncChannel.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/adapter/SyncToAsyncChannel.java Tue Feb 21 15:12:56 2006
@@ -1,200 +1,200 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.adapter;
-
-import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
-import edu.emory.mathcs.backport.java.util.concurrent.Executor;
-import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.activeio.Channel;
-import org.apache.activeio.ChannelFactory;
-import org.apache.activeio.packet.EOSPacket;
-import org.apache.activeio.packet.Packet;
-import org.apache.activeio.packet.async.AsyncChannel;
-import org.apache.activeio.packet.async.AsyncChannelListener;
-import org.apache.activeio.packet.sync.SyncChannel;
-
-import java.io.IOException;
-
-/**
- * Adapts a {@see org.apache.activeio.SynchChannel} so that it provides an 
- * {@see org.apache.activeio.AsyncChannel} interface.  When this channel
- * is started, a background thread is used to poll the {@see org.apache.activeio.SynchChannel}
- *  for packets comming up the channel which are then delivered to the 
- * {@see org.apache.activeio.ChannelConsumer}.
- * 
- * @version $Revision$
- */
-public class SyncToAsyncChannel implements AsyncChannel, Runnable {
-
-    private final AtomicBoolean running = new AtomicBoolean(false);
-    private final SyncChannel syncChannel;
-    private final Executor executor;
-    private AsyncChannelListener channelListener;
-    private CountDownLatch doneCountDownLatch;
-    
-    
-    static public AsyncChannel adapt(Channel channel) {
-        return adapt(channel, ChannelFactory.DEFAULT_EXECUTOR);
-    }
-
-    static public AsyncChannel adapt(Channel channel, Executor executor) {
-        
-        // It might not need adapting
-        if( channel instanceof AsyncChannel ) {
-            return (AsyncChannel) channel;
-        }
-        
-        // Can we just just undo the adaptor
-        if( channel.getClass() == SyncToAsyncChannel.class ) {
-            return ((AsyncToSyncChannel)channel).getAsyncChannel();
-        }
-        
-        return new SyncToAsyncChannel((SyncChannel) channel, executor);
-        
-    }
-
-    
-    /**
-     * @deprecated {@see #adapt(SynchChannel)}
-     */
-    public SyncToAsyncChannel(SyncChannel syncChannel) {
-        this(syncChannel, ChannelFactory.DEFAULT_EXECUTOR);
-    }
-
-    /**
-     * @deprecated {@see #adapt(SynchChannel, Executor)}
-     */
-    public SyncToAsyncChannel(SyncChannel syncChannel, Executor executor) {
-        this.syncChannel = syncChannel;
-        this.executor = executor;
-    }
-
-    synchronized public void start() throws IOException {
-        if (running.compareAndSet(false, true)) {
-            
-            if (channelListener == null)
-                throw new IllegalStateException("UpPacketListener must be set before object can be started.");
-            
-            syncChannel.start();
-
-            doneCountDownLatch = new CountDownLatch(1);
-            executor.execute(this);
-        }
-    }
-
-    synchronized public void stop() throws IOException {
-        if (running.compareAndSet(true, false)) {
-            try {
-                doneCountDownLatch.await(5, TimeUnit.SECONDS);
-            } catch (Throwable e) {
-            }
-            syncChannel.stop();
-        }
-    }
-
-    /**
-     * reads packets from a Socket
-     */
-    public void run() {
-        
-        // Change the thread name.
-        String oldName = Thread.currentThread().getName();        
-        Thread.currentThread().setName( syncChannel.toString() );        
-        try {
-	        while (running.get()) {
-	            try {
-	                Packet packet = syncChannel.read(500);
-	                if( packet==null )
-	                    continue;    
-                    
-                    if( packet == EOSPacket.EOS_PACKET ) {
-                        channelListener.onPacket(packet);
-                        return;
-                    }
-                    
-                    if( packet.hasRemaining() ) {
-                        channelListener.onPacket(packet);
-                    }
-                    
-	            } catch (IOException e) {
-	                channelListener.onPacketError(e);
-	            } catch (Throwable e) {
-	                channelListener.onPacketError((IOException)new IOException("Unexpected Error: "+e).initCause(e));
-	            }
-	        }
-        } finally {
-            if( doneCountDownLatch!=null )
-                doneCountDownLatch.countDown();
-            Thread.currentThread().setName(oldName);
-        }
-    }
-
-    /**
-     * @see org.apache.activeio.packet.async.AsyncChannel#setAsyncChannelListener(org.apache.activeio.UpPacketListener)
-     */
-    public void setAsyncChannelListener(AsyncChannelListener channelListener) {
-        if (running.get())
-            throw new IllegalStateException("Cannot change the UpPacketListener while the object is running.");
-        this.channelListener = channelListener;
-    }
-
-    /**
-     * @see org.apache.activeio.Channel#write(org.apache.activeio.packet.Packet)
-     */
-    public void write(org.apache.activeio.packet.Packet packet) throws IOException {
-        syncChannel.write(packet);
-    }
-
-    /**
-     * @see org.apache.activeio.Channel#flush()
-     */
-    public void flush() throws IOException {
-        syncChannel.flush();
-    }
-
-    /**
-     * @see org.apache.activeio.Disposable#dispose()
-     */
-    public void dispose() {
-        try {
-            stop();
-        } catch ( IOException ignore) {
-        }
-        syncChannel.dispose();        
-    }
-
-    public AsyncChannelListener getAsyncChannelListener() {
-        return channelListener;
-    }
-    
-    public Object getAdapter(Class target) {
-        if( target.isAssignableFrom(getClass()) ) {
-            return this;
-        }
-        return syncChannel.getAdapter(target);
-    }    
-    
-    public SyncChannel getSynchChannel() {
-        return syncChannel;
-    }
-
-    public String toString() {
-        return syncChannel.toString();
-    }
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.adapter;
+
+import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
+import edu.emory.mathcs.backport.java.util.concurrent.Executor;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activeio.Channel;
+import org.apache.activeio.ChannelFactory;
+import org.apache.activeio.packet.EOSPacket;
+import org.apache.activeio.packet.Packet;
+import org.apache.activeio.packet.async.AsyncChannel;
+import org.apache.activeio.packet.async.AsyncChannelListener;
+import org.apache.activeio.packet.sync.SyncChannel;
+
+import java.io.IOException;
+
+/**
+ * Adapts a {@see org.apache.activeio.SynchChannel} so that it provides an 
+ * {@see org.apache.activeio.AsyncChannel} interface.  When this channel
+ * is started, a background thread is used to poll the {@see org.apache.activeio.SynchChannel}
+ *  for packets comming up the channel which are then delivered to the 
+ * {@see org.apache.activeio.ChannelConsumer}.
+ * 
+ * @version $Revision$
+ */
+public class SyncToAsyncChannel implements AsyncChannel, Runnable {
+
+    private final AtomicBoolean running = new AtomicBoolean(false);
+    private final SyncChannel syncChannel;
+    private final Executor executor;
+    private AsyncChannelListener channelListener;
+    private CountDownLatch doneCountDownLatch;
+    
+    
+    static public AsyncChannel adapt(Channel channel) {
+        return adapt(channel, ChannelFactory.DEFAULT_EXECUTOR);
+    }
+
+    static public AsyncChannel adapt(Channel channel, Executor executor) {
+        
+        // It might not need adapting
+        if( channel instanceof AsyncChannel ) {
+            return (AsyncChannel) channel;
+        }
+        
+        // Can we just just undo the adaptor
+        if( channel.getClass() == SyncToAsyncChannel.class ) {
+            return ((AsyncToSyncChannel)channel).getAsyncChannel();
+        }
+        
+        return new SyncToAsyncChannel((SyncChannel) channel, executor);
+        
+    }
+
+    
+    /**
+     * @deprecated {@see #adapt(SynchChannel)}
+     */
+    public SyncToAsyncChannel(SyncChannel syncChannel) {
+        this(syncChannel, ChannelFactory.DEFAULT_EXECUTOR);
+    }
+
+    /**
+     * @deprecated {@see #adapt(SynchChannel, Executor)}
+     */
+    public SyncToAsyncChannel(SyncChannel syncChannel, Executor executor) {
+        this.syncChannel = syncChannel;
+        this.executor = executor;
+    }
+
+    synchronized public void start() throws IOException {
+        if (running.compareAndSet(false, true)) {
+            
+            if (channelListener == null)
+                throw new IllegalStateException("UpPacketListener must be set before object can be started.");
+            
+            syncChannel.start();
+
+            doneCountDownLatch = new CountDownLatch(1);
+            executor.execute(this);
+        }
+    }
+
+    synchronized public void stop() throws IOException {
+        if (running.compareAndSet(true, false)) {
+            try {
+                doneCountDownLatch.await(5, TimeUnit.SECONDS);
+            } catch (Throwable e) {
+            }
+            syncChannel.stop();
+        }
+    }
+
+    /**
+     * reads packets from a Socket
+     */
+    public void run() {
+        
+        // Change the thread name.
+        String oldName = Thread.currentThread().getName();        
+        Thread.currentThread().setName( syncChannel.toString() );        
+        try {
+	        while (running.get()) {
+	            try {
+	                Packet packet = syncChannel.read(500);
+	                if( packet==null )
+	                    continue;    
+                    
+                    if( packet == EOSPacket.EOS_PACKET ) {
+                        channelListener.onPacket(packet);
+                        return;
+                    }
+                    
+                    if( packet.hasRemaining() ) {
+                        channelListener.onPacket(packet);
+                    }
+                    
+	            } catch (IOException e) {
+	                channelListener.onPacketError(e);
+	            } catch (Throwable e) {
+	                channelListener.onPacketError((IOException)new IOException("Unexpected Error: "+e).initCause(e));
+	            }
+	        }
+        } finally {
+            if( doneCountDownLatch!=null )
+                doneCountDownLatch.countDown();
+            Thread.currentThread().setName(oldName);
+        }
+    }
+
+    /**
+     * @see org.apache.activeio.packet.async.AsyncChannel#setAsyncChannelListener(org.apache.activeio.UpPacketListener)
+     */
+    public void setAsyncChannelListener(AsyncChannelListener channelListener) {
+        if (running.get())
+            throw new IllegalStateException("Cannot change the UpPacketListener while the object is running.");
+        this.channelListener = channelListener;
+    }
+
+    /**
+     * @see org.apache.activeio.Channel#write(org.apache.activeio.packet.Packet)
+     */
+    public void write(org.apache.activeio.packet.Packet packet) throws IOException {
+        syncChannel.write(packet);
+    }
+
+    /**
+     * @see org.apache.activeio.Channel#flush()
+     */
+    public void flush() throws IOException {
+        syncChannel.flush();
+    }
+
+    /**
+     * @see org.apache.activeio.Disposable#dispose()
+     */
+    public void dispose() {
+        try {
+            stop();
+        } catch ( IOException ignore) {
+        }
+        syncChannel.dispose();        
+    }
+
+    public AsyncChannelListener getAsyncChannelListener() {
+        return channelListener;
+    }
+    
+    public Object getAdapter(Class target) {
+        if( target.isAssignableFrom(getClass()) ) {
+            return this;
+        }
+        return syncChannel.getAdapter(target);
+    }    
+    
+    public SyncChannel getSynchChannel() {
+        return syncChannel;
+    }
+
+    public String toString() {
+        return syncChannel.toString();
+    }
 }

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/adapter/SyncToAsyncChannel.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/adapter/SyncToAsyncChannelFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/adapter/SyncToAsyncChannelFactory.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/adapter/SyncToAsyncChannelFactory.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/adapter/SyncToAsyncChannelFactory.java Tue Feb 21 15:12:56 2006
@@ -1,83 +1,83 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.adapter;
-
-import java.io.IOException;
-import java.net.URI;
-
-import org.apache.activeio.ChannelFactory;
-import org.apache.activeio.packet.async.AsyncChannel;
-import org.apache.activeio.packet.async.AsyncChannelFactory;
-import org.apache.activeio.packet.async.AsyncChannelServer;
-import org.apache.activeio.packet.sync.SyncChannelFactory;
-
-import edu.emory.mathcs.backport.java.util.concurrent.Executor;
-
-/**
- * @version $Revision$
- */
-public class SyncToAsyncChannelFactory implements AsyncChannelFactory {
-    
-    private final SyncChannelFactory syncChannelFactory;
-    private final Executor executor;
-        
-    static public AsyncChannelFactory adapt(SyncChannelFactory channelFactory) {
-        return adapt(channelFactory, ChannelFactory.DEFAULT_EXECUTOR);
-    }
-    
-    static public AsyncChannelFactory adapt(SyncChannelFactory channelFactory, Executor executor ) {
-
-        // It might not need adapting
-        if( channelFactory instanceof AsyncChannelFactory ) {
-            return (AsyncChannelFactory) channelFactory;
-        }
-
-        // Can we just just undo the adaptor
-        if( channelFactory.getClass() == AsyncToSyncChannelFactory.class ) {
-            return ((AsyncToSyncChannelFactory)channelFactory).getAsyncChannelFactory();
-        }
-        
-        return new SyncToAsyncChannelFactory((SyncChannelFactory)channelFactory, executor);        
-    }
-    
-    /**
-     * @deprecated {@see #adapt(SyncChannelFactory)}
-     */
-    public SyncToAsyncChannelFactory(final SyncChannelFactory next) {
-        this(next, ChannelFactory.DEFAULT_EXECUTOR);
-    }
-    
-    /**
-     * @deprecated {@see #adapt(SyncChannelFactory, Executor)}
-     */
-    public SyncToAsyncChannelFactory(final SyncChannelFactory next, Executor executor) {
-        this.syncChannelFactory = next;
-        this.executor = executor;
-    }
-        
-    public AsyncChannel openAsyncChannel(URI location) throws IOException {
-        return SyncToAsyncChannel.adapt(syncChannelFactory.openSyncChannel(location),executor);
-    }
-
-    public AsyncChannelServer bindAsyncChannel(URI location) throws IOException {
-        return new SyncToAsyncChannelServer(syncChannelFactory.bindSyncChannel(location),executor);
-    }
-    
-    public SyncChannelFactory getSyncChannelFactory() {
-        return syncChannelFactory;
-    }
-}
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.adapter;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.activeio.ChannelFactory;
+import org.apache.activeio.packet.async.AsyncChannel;
+import org.apache.activeio.packet.async.AsyncChannelFactory;
+import org.apache.activeio.packet.async.AsyncChannelServer;
+import org.apache.activeio.packet.sync.SyncChannelFactory;
+
+import edu.emory.mathcs.backport.java.util.concurrent.Executor;
+
+/**
+ * @version $Revision$
+ */
+public class SyncToAsyncChannelFactory implements AsyncChannelFactory {
+    
+    private final SyncChannelFactory syncChannelFactory;
+    private final Executor executor;
+        
+    static public AsyncChannelFactory adapt(SyncChannelFactory channelFactory) {
+        return adapt(channelFactory, ChannelFactory.DEFAULT_EXECUTOR);
+    }
+    
+    static public AsyncChannelFactory adapt(SyncChannelFactory channelFactory, Executor executor ) {
+
+        // It might not need adapting
+        if( channelFactory instanceof AsyncChannelFactory ) {
+            return (AsyncChannelFactory) channelFactory;
+        }
+
+        // Can we just just undo the adaptor
+        if( channelFactory.getClass() == AsyncToSyncChannelFactory.class ) {
+            return ((AsyncToSyncChannelFactory)channelFactory).getAsyncChannelFactory();
+        }
+        
+        return new SyncToAsyncChannelFactory((SyncChannelFactory)channelFactory, executor);        
+    }
+    
+    /**
+     * @deprecated {@see #adapt(SyncChannelFactory)}
+     */
+    public SyncToAsyncChannelFactory(final SyncChannelFactory next) {
+        this(next, ChannelFactory.DEFAULT_EXECUTOR);
+    }
+    
+    /**
+     * @deprecated {@see #adapt(SyncChannelFactory, Executor)}
+     */
+    public SyncToAsyncChannelFactory(final SyncChannelFactory next, Executor executor) {
+        this.syncChannelFactory = next;
+        this.executor = executor;
+    }
+        
+    public AsyncChannel openAsyncChannel(URI location) throws IOException {
+        return SyncToAsyncChannel.adapt(syncChannelFactory.openSyncChannel(location),executor);
+    }
+
+    public AsyncChannelServer bindAsyncChannel(URI location) throws IOException {
+        return new SyncToAsyncChannelServer(syncChannelFactory.bindSyncChannel(location),executor);
+    }
+    
+    public SyncChannelFactory getSyncChannelFactory() {
+        return syncChannelFactory;
+    }
+}

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/adapter/SyncToAsyncChannelFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/adapter/SyncToAsyncChannelServer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/adapter/SyncToAsyncChannelServer.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/adapter/SyncToAsyncChannelServer.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/adapter/SyncToAsyncChannelServer.java Tue Feb 21 15:12:56 2006
@@ -1,171 +1,171 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activeio.adapter;
-
-import java.io.IOException;
-import java.net.URI;
-
-import org.apache.activeio.AcceptListener;
-import org.apache.activeio.Channel;
-import org.apache.activeio.ChannelFactory;
-import org.apache.activeio.ChannelServer;
-import org.apache.activeio.packet.async.AsyncChannelServer;
-import org.apache.activeio.packet.sync.SyncChannelServer;
-
-import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
-import edu.emory.mathcs.backport.java.util.concurrent.Executor;
-import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Adapts a {@see org.apache.activeio,SynchChannelServer} so that it provides an 
- * {@see org.apache.activeio.AsyncChannelServer} interface.  When this channel
- * is started, a background thread is used to poll the (@see org.apache.activeio.SynchChannelServer}
- * for accepted channel connections which are then delivered to the {@see org.apache.activeio.AcceptConsumer}.
- * 
- * @version $Revision$
- */
-final public class SyncToAsyncChannelServer implements AsyncChannelServer, Runnable {
-
-    private final SyncChannelServer syncChannelServer;
-    private final AtomicBoolean running = new AtomicBoolean(false);
-    private final Executor executor;
-    private AcceptListener acceptListener;
-    private CountDownLatch doneCountDownLatch;
-    
-    
-    static public AsyncChannelServer adapt(ChannelServer channel) {
-        return adapt(channel, ChannelFactory.DEFAULT_EXECUTOR);
-    }
-
-    static public AsyncChannelServer adapt(ChannelServer channel, Executor executor) {
-
-        // It might not need adapting
-        if( channel instanceof AsyncChannelServer ) {
-            return (AsyncChannelServer) channel;
-        }
-
-        // Can we just just undo the adaptor
-        if( channel.getClass() == SyncToAsyncChannel.class ) {
-            return ((AsyncToSyncChannelServer)channel).getAsyncChannelServer();
-        }
-        
-        return new SyncToAsyncChannelServer((SyncChannelServer)channel, executor);        
-    }
-    
-    public SyncToAsyncChannelServer(SyncChannelServer syncServer) {
-        this(syncServer, ChannelFactory.DEFAULT_EXECUTOR);
-    }
-    
-    public SyncToAsyncChannelServer(SyncChannelServer syncServer, Executor executor) {
-        this.syncChannelServer = syncServer;        
-        this.executor=executor;
-    }
-    
-    synchronized public void start() throws IOException {        
-        if (running.compareAndSet(false, true)) {
-            
-            if( acceptListener == null )
-                throw new IllegalStateException("AcceptListener must be set before object can be started.");
-
-            syncChannelServer.start();
-            
-            doneCountDownLatch = new CountDownLatch(1);
-            executor.execute(this);
-        }
-    }
-
-    synchronized public void stop() throws IOException {
-        if (running.compareAndSet(true, false)) {
-            try {
-                doneCountDownLatch.await(5, TimeUnit.SECONDS);
-            } catch (Throwable e) {
-            }
-            syncChannelServer.stop();
-        }
-    }
-
-    public void run() {
-        // Change the thread name.
-        String oldName = Thread.currentThread().getName();        
-        Thread.currentThread().setName( syncChannelServer.toString() );
-        try {
-	        while (running.get()) {
-	            try {
-	                Channel channel = syncChannelServer.accept(500);
-	                if( channel == null )
-	                    continue;                
-	                acceptListener.onAccept(channel);
-	            } catch (IOException e) {
-	                if( running.get() )
-	                    acceptListener.onAcceptError(e);        
-	        	} catch (Throwable e) {        	    
-	                if( running.get() )
-	                    acceptListener.onAcceptError((IOException)new IOException("Unexpected Error: "+e).initCause(e));
-	        	}
-	        }
-        } finally {
-            if( doneCountDownLatch!=null )
-                doneCountDownLatch.countDown();
-            Thread.currentThread().setName(oldName);            
-        }
-    }
-
-    /**
-     * @see org.apache.activeio.packet.async.AsyncChannelServer#setAcceptListener(org.apache.activeio.AcceptListener)
-     */
-    public void setAcceptListener(AcceptListener acceptListener) {
-        if(running.get()) 
-            throw new IllegalStateException("Cannot change the AcceptListener while the object is running.");        
-        this.acceptListener = acceptListener;
-    }
-
-    /**
-     * @see org.apache.activeio.Disposable#dispose()
-     */
-    public void dispose() {
-        try {
-            stop();
-        } catch ( IOException ignore) {
-        }
-        syncChannelServer.dispose();
-    }
-
-    public URI getBindURI() {
-        return syncChannelServer.getBindURI();
-    }
-
-    public URI getConnectURI() {
-        return syncChannelServer.getConnectURI();
-    }
-
-    public SyncChannelServer getSynchChannelServer() {
-        return syncChannelServer;
-    }
-    
-    public Object getAdapter(Class target) {
-        if( target.isAssignableFrom(getClass()) ) {
-            return this;
-        }
-        return syncChannelServer.getAdapter(target);
-    }    
-    
-    public String toString() {
-        return syncChannelServer.toString();
-    }
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activeio.adapter;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.activeio.AcceptListener;
+import org.apache.activeio.Channel;
+import org.apache.activeio.ChannelFactory;
+import org.apache.activeio.ChannelServer;
+import org.apache.activeio.packet.async.AsyncChannelServer;
+import org.apache.activeio.packet.sync.SyncChannelServer;
+
+import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
+import edu.emory.mathcs.backport.java.util.concurrent.Executor;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Adapts a {@see org.apache.activeio,SynchChannelServer} so that it provides an 
+ * {@see org.apache.activeio.AsyncChannelServer} interface.  When this channel
+ * is started, a background thread is used to poll the (@see org.apache.activeio.SynchChannelServer}
+ * for accepted channel connections which are then delivered to the {@see org.apache.activeio.AcceptConsumer}.
+ * 
+ * @version $Revision$
+ */
+final public class SyncToAsyncChannelServer implements AsyncChannelServer, Runnable {
+
+    private final SyncChannelServer syncChannelServer;
+    private final AtomicBoolean running = new AtomicBoolean(false);
+    private final Executor executor;
+    private AcceptListener acceptListener;
+    private CountDownLatch doneCountDownLatch;
+    
+    
+    static public AsyncChannelServer adapt(ChannelServer channel) {
+        return adapt(channel, ChannelFactory.DEFAULT_EXECUTOR);
+    }
+
+    static public AsyncChannelServer adapt(ChannelServer channel, Executor executor) {
+
+        // It might not need adapting
+        if( channel instanceof AsyncChannelServer ) {
+            return (AsyncChannelServer) channel;
+        }
+
+        // Can we just just undo the adaptor
+        if( channel.getClass() == SyncToAsyncChannel.class ) {
+            return ((AsyncToSyncChannelServer)channel).getAsyncChannelServer();
+        }
+        
+        return new SyncToAsyncChannelServer((SyncChannelServer)channel, executor);        
+    }
+    
+    public SyncToAsyncChannelServer(SyncChannelServer syncServer) {
+        this(syncServer, ChannelFactory.DEFAULT_EXECUTOR);
+    }
+    
+    public SyncToAsyncChannelServer(SyncChannelServer syncServer, Executor executor) {
+        this.syncChannelServer = syncServer;        
+        this.executor=executor;
+    }
+    
+    synchronized public void start() throws IOException {        
+        if (running.compareAndSet(false, true)) {
+            
+            if( acceptListener == null )
+                throw new IllegalStateException("AcceptListener must be set before object can be started.");
+
+            syncChannelServer.start();
+            
+            doneCountDownLatch = new CountDownLatch(1);
+            executor.execute(this);
+        }
+    }
+
+    synchronized public void stop() throws IOException {
+        if (running.compareAndSet(true, false)) {
+            try {
+                doneCountDownLatch.await(5, TimeUnit.SECONDS);
+            } catch (Throwable e) {
+            }
+            syncChannelServer.stop();
+        }
+    }
+
+    public void run() {
+        // Change the thread name.
+        String oldName = Thread.currentThread().getName();        
+        Thread.currentThread().setName( syncChannelServer.toString() );
+        try {
+	        while (running.get()) {
+	            try {
+	                Channel channel = syncChannelServer.accept(500);
+	                if( channel == null )
+	                    continue;                
+	                acceptListener.onAccept(channel);
+	            } catch (IOException e) {
+	                if( running.get() )
+	                    acceptListener.onAcceptError(e);        
+	        	} catch (Throwable e) {        	    
+	                if( running.get() )
+	                    acceptListener.onAcceptError((IOException)new IOException("Unexpected Error: "+e).initCause(e));
+	        	}
+	        }
+        } finally {
+            if( doneCountDownLatch!=null )
+                doneCountDownLatch.countDown();
+            Thread.currentThread().setName(oldName);            
+        }
+    }
+
+    /**
+     * @see org.apache.activeio.packet.async.AsyncChannelServer#setAcceptListener(org.apache.activeio.AcceptListener)
+     */
+    public void setAcceptListener(AcceptListener acceptListener) {
+        if(running.get()) 
+            throw new IllegalStateException("Cannot change the AcceptListener while the object is running.");        
+        this.acceptListener = acceptListener;
+    }
+
+    /**
+     * @see org.apache.activeio.Disposable#dispose()
+     */
+    public void dispose() {
+        try {
+            stop();
+        } catch ( IOException ignore) {
+        }
+        syncChannelServer.dispose();
+    }
+
+    public URI getBindURI() {
+        return syncChannelServer.getBindURI();
+    }
+
+    public URI getConnectURI() {
+        return syncChannelServer.getConnectURI();
+    }
+
+    public SyncChannelServer getSynchChannelServer() {
+        return syncChannelServer;
+    }
+    
+    public Object getAdapter(Class target) {
+        if( target.isAssignableFrom(getClass()) ) {
+            return this;
+        }
+        return syncChannelServer.getAdapter(target);
+    }    
+    
+    public String toString() {
+        return syncChannelServer.toString();
+    }
 }

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/adapter/SyncToAsyncChannelServer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/adapter/SynchToAsynchChannelAdapter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/command/AsyncChannelToAsyncCommandChannel.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/command/AsyncCommandChannel.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/command/ClassLoading.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/command/ClassLoadingAwareObjectInputStream.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/command/CommandListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/command/DefaultWireFormat.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/command/WireFormat.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/command/WireFormatFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/InvalidRecordLocationException.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/InvalidRecordLocationException.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/InvalidRecordLocationException.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/InvalidRecordLocationException.java Tue Feb 21 15:12:56 2006
@@ -1,59 +1,59 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.journal;
-
-/**
- * Exception thrown by a Journal to indicate that an invalid RecordLocation was detected.
- * 
- * @version $Revision: 1.1 $
- */
-public class InvalidRecordLocationException extends Exception {
-
-	/**
-     * Comment for <code>serialVersionUID</code>
-     */
-    private static final long serialVersionUID = 3618414947307239475L;
-
-    /**
-	 * 
-	 */
-	public InvalidRecordLocationException() {
-		super();
-	}
-
-	/**
-	 * @param msg
-	 */
-	public InvalidRecordLocationException(String msg) {
-		super(msg);
-	}
-
-	/**
-	 * @param msg
-	 * @param rootCause
-	 */
-	public InvalidRecordLocationException(String msg, Throwable rootCause) {
-		super(msg, rootCause);
-	}
-	
-	/**
-	 * @param rootCause
-	 */
-	public InvalidRecordLocationException(Throwable rootCause) {
-		super(rootCause);
-	}
-}
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.journal;
+
+/**
+ * Exception thrown by a Journal to indicate that an invalid RecordLocation was detected.
+ * 
+ * @version $Revision: 1.1 $
+ */
+public class InvalidRecordLocationException extends Exception {
+
+	/**
+     * Comment for <code>serialVersionUID</code>
+     */
+    private static final long serialVersionUID = 3618414947307239475L;
+
+    /**
+	 * 
+	 */
+	public InvalidRecordLocationException() {
+		super();
+	}
+
+	/**
+	 * @param msg
+	 */
+	public InvalidRecordLocationException(String msg) {
+		super(msg);
+	}
+
+	/**
+	 * @param msg
+	 * @param rootCause
+	 */
+	public InvalidRecordLocationException(String msg, Throwable rootCause) {
+		super(msg, rootCause);
+	}
+	
+	/**
+	 * @param rootCause
+	 */
+	public InvalidRecordLocationException(Throwable rootCause) {
+		super(rootCause);
+	}
+}

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/InvalidRecordLocationException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/Journal.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/Journal.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/Journal.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/Journal.java Tue Feb 21 15:12:56 2006
@@ -1,127 +1,127 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.journal;
-
-import java.io.IOException;
-
-import org.apache.activeio.packet.Packet;
-
-/**
- * A Journal is a record logging Interface that can be used to implement 
- * a transaction log.  
- * 
- * 
- * This interface was largely extracted out of the HOWL project to allow 
- * ActiveMQ to switch between different Journal implementations verry easily. 
- * 
- * @version $Revision: 1.1 $
- */
-public interface Journal {
-
-	/**
-	 * Writes a {@see Packet} of  data to the journal.  If <code>sync</code>
-	 * is true, then this call blocks until the data has landed on the physical 
-	 * disk.  Otherwise, this enqueues the write request and returns.
-	 * 
-	 * @param record - the data to be written to disk.
-	 * @param sync - If this call should block until the data lands on disk.
-	 * 
-	 * @return RecordLocation the location where the data will be written to on disk.
-	 * 
-	 * @throws IOException if the write failed.
-	 * @throws IllegalStateException if the journal is closed.
-	 */
-	public RecordLocation write(Packet packet, boolean sync) throws IOException, IllegalStateException;
-
-	/**
-	 * Reads a previously written record from the journal. 
-	 *  
-	 * @param location is where to read the record from.
-	 * 
-	 * @return the data previously written at the <code>location</code>.
-	 * 
-	 * @throws InvalidRecordLocationException if <code>location</code> parameter is out of range.  
-	 *         It cannot be a location that is before the current mark. 
-	 * @throws IOException if the record could not be read.
-	 * @throws IllegalStateException if the journal is closed.
-	 */
-	public Packet read(RecordLocation location) throws InvalidRecordLocationException, IOException, IllegalStateException;
-
-	/**
-	 * Informs the journal that all the journal space up to the <code>location</code> is no longer
-	 * needed and can be reclaimed for reuse.
-	 * 
-	 * @param location the location of the record to mark.  All record locations before the marked 
-	 * location will no longger be vaild. 
-	 * 
-	 * @param sync if this call should block until the mark is set on the journal.
-	 * 
-	 * @throws InvalidRecordLocationException if <code>location</code> parameter is out of range.  
-	 *         It cannot be a location that is before the current mark. 
-	 * @throws IOException if the record could not be read.
-	 * @throws IllegalStateException if the journal is closed.
-	 */
-	public abstract void setMark(RecordLocation location, boolean sync)
-			throws InvalidRecordLocationException, IOException, IllegalStateException;
-	
-	/**
-	 * Obtains the mark that was set in the Journal.
-	 * 
-	 * @see read(RecordLocation location);
-	 * @return the mark that was set in the Journal.
-	 * @throws IllegalStateException if the journal is closed.
-	 */
-	public RecordLocation getMark() throws IllegalStateException;
-
-
-	/**
-	 * Close the Journal.  
-	 * This is blocking operation that waits for any pending put opperations to be forced to disk.
-	 * Once the Journal is closed, all other methods of the journal should throw IllegalStateException.
-	 * 
-	 * @throws IOException if an error occurs while the journal is being closed.
-	 */
-	public abstract void close() throws IOException;
-
-	/**
-	 * Allows you to get the next RecordLocation after the <code>location</code> that 
-	 * is in the journal.
-	 * 
-	 * @param location the reference location the is used to find the next location.
-	 * To get the oldest location available in the journal, <code>location</code> 
-	 * should be set to null.
-	 * 
-	 * 
-	 * @return the next record location
-	 * 
-	 * @throws InvalidRecordLocationException if <code>location</code> parameter is out of range.  
-	 *         It cannot be a location that is before the current mark. 
-	 * @throws IllegalStateException if the journal is closed.
-	 */
-	public abstract RecordLocation getNextRecordLocation(RecordLocation location)
-		throws InvalidRecordLocationException, IOException, IllegalStateException;
-
-
-	/**
-	 * Registers a <code>JournalEventListener</code> that will receive notifications from the Journal.
-	 * 
-	 * @param listener object that will receive journal events.
-	 * @throws IllegalStateException if the journal is closed.
-	 */
-	public abstract void setJournalEventListener(JournalEventListener listener) throws IllegalStateException;
-	
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.journal;
+
+import java.io.IOException;
+
+import org.apache.activeio.packet.Packet;
+
+/**
+ * A Journal is a record logging Interface that can be used to implement 
+ * a transaction log.  
+ * 
+ * 
+ * This interface was largely extracted out of the HOWL project to allow 
+ * ActiveMQ to switch between different Journal implementations verry easily. 
+ * 
+ * @version $Revision: 1.1 $
+ */
+public interface Journal {
+
+	/**
+	 * Writes a {@see Packet} of  data to the journal.  If <code>sync</code>
+	 * is true, then this call blocks until the data has landed on the physical 
+	 * disk.  Otherwise, this enqueues the write request and returns.
+	 * 
+	 * @param record - the data to be written to disk.
+	 * @param sync - If this call should block until the data lands on disk.
+	 * 
+	 * @return RecordLocation the location where the data will be written to on disk.
+	 * 
+	 * @throws IOException if the write failed.
+	 * @throws IllegalStateException if the journal is closed.
+	 */
+	public RecordLocation write(Packet packet, boolean sync) throws IOException, IllegalStateException;
+
+	/**
+	 * Reads a previously written record from the journal. 
+	 *  
+	 * @param location is where to read the record from.
+	 * 
+	 * @return the data previously written at the <code>location</code>.
+	 * 
+	 * @throws InvalidRecordLocationException if <code>location</code> parameter is out of range.  
+	 *         It cannot be a location that is before the current mark. 
+	 * @throws IOException if the record could not be read.
+	 * @throws IllegalStateException if the journal is closed.
+	 */
+	public Packet read(RecordLocation location) throws InvalidRecordLocationException, IOException, IllegalStateException;
+
+	/**
+	 * Informs the journal that all the journal space up to the <code>location</code> is no longer
+	 * needed and can be reclaimed for reuse.
+	 * 
+	 * @param location the location of the record to mark.  All record locations before the marked 
+	 * location will no longger be vaild. 
+	 * 
+	 * @param sync if this call should block until the mark is set on the journal.
+	 * 
+	 * @throws InvalidRecordLocationException if <code>location</code> parameter is out of range.  
+	 *         It cannot be a location that is before the current mark. 
+	 * @throws IOException if the record could not be read.
+	 * @throws IllegalStateException if the journal is closed.
+	 */
+	public abstract void setMark(RecordLocation location, boolean sync)
+			throws InvalidRecordLocationException, IOException, IllegalStateException;
+	
+	/**
+	 * Obtains the mark that was set in the Journal.
+	 * 
+	 * @see read(RecordLocation location);
+	 * @return the mark that was set in the Journal.
+	 * @throws IllegalStateException if the journal is closed.
+	 */
+	public RecordLocation getMark() throws IllegalStateException;
+
+
+	/**
+	 * Close the Journal.  
+	 * This is blocking operation that waits for any pending put opperations to be forced to disk.
+	 * Once the Journal is closed, all other methods of the journal should throw IllegalStateException.
+	 * 
+	 * @throws IOException if an error occurs while the journal is being closed.
+	 */
+	public abstract void close() throws IOException;
+
+	/**
+	 * Allows you to get the next RecordLocation after the <code>location</code> that 
+	 * is in the journal.
+	 * 
+	 * @param location the reference location the is used to find the next location.
+	 * To get the oldest location available in the journal, <code>location</code> 
+	 * should be set to null.
+	 * 
+	 * 
+	 * @return the next record location
+	 * 
+	 * @throws InvalidRecordLocationException if <code>location</code> parameter is out of range.  
+	 *         It cannot be a location that is before the current mark. 
+	 * @throws IllegalStateException if the journal is closed.
+	 */
+	public abstract RecordLocation getNextRecordLocation(RecordLocation location)
+		throws InvalidRecordLocationException, IOException, IllegalStateException;
+
+
+	/**
+	 * Registers a <code>JournalEventListener</code> that will receive notifications from the Journal.
+	 * 
+	 * @param listener object that will receive journal events.
+	 * @throws IllegalStateException if the journal is closed.
+	 */
+	public abstract void setJournalEventListener(JournalEventListener listener) throws IllegalStateException;
+	
 }

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/Journal.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/JournalEventListener.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/JournalEventListener.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/JournalEventListener.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/JournalEventListener.java Tue Feb 21 15:12:56 2006
@@ -1,37 +1,37 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.journal;
-
-/**
- * Defines an object which listens for Journal Events.
- * 
- * @version $Revision: 1.1 $
- */
-public interface JournalEventListener {
-
-	/**
-	 * This event is issues when a Journal implementations wants to recover 
-	 * disk space used by old records.  If journal space is not reliquised 
-	 * by setting the Journal's mark at or past the <code>safeLocation</code>
-	 * further write opperations against the Journal may casuse IOExceptions 
-	 * to occur due to a log overflow condition.
-	 * 
-	 * @param safeLocation the oldest location that the journal recomends the mark to be set.
-	 */
-	void overflowNotification(RecordLocation safeLocation);
-
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.journal;
+
+/**
+ * Defines an object which listens for Journal Events.
+ * 
+ * @version $Revision: 1.1 $
+ */
+public interface JournalEventListener {
+
+	/**
+	 * This event is issues when a Journal implementations wants to recover 
+	 * disk space used by old records.  If journal space is not reliquised 
+	 * by setting the Journal's mark at or past the <code>safeLocation</code>
+	 * further write opperations against the Journal may casuse IOExceptions 
+	 * to occur due to a log overflow condition.
+	 * 
+	 * @param safeLocation the oldest location that the journal recomends the mark to be set.
+	 */
+	void overflowNotification(RecordLocation safeLocation);
+
 }

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/JournalEventListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/RecordLocation.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/RecordLocation.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/RecordLocation.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/RecordLocation.java Tue Feb 21 15:12:56 2006
@@ -1,30 +1,30 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.journal;
-
-/**
- * A RecordLocation is used to locate data records that have been 
- * logged to a Journal via the <code>Journal.put()</code> method call.
- * 
- * RecordLocation are comparable on the position in the Journal 
- * where they reside.
- * 
- * @version $Revision: 1.1 $
- */
-public interface RecordLocation extends Comparable {
-
-}
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.journal;
+
+/**
+ * A RecordLocation is used to locate data records that have been 
+ * logged to a Journal via the <code>Journal.put()</code> method call.
+ * 
+ * RecordLocation are comparable on the position in the Journal 
+ * where they reside.
+ * 
+ * @version $Revision: 1.1 $
+ */
+public interface RecordLocation extends Comparable {
+
+}

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/RecordLocation.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/active/BatchedWrite.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/active/BatchedWrite.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/active/BatchedWrite.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/active/BatchedWrite.java Tue Feb 21 15:12:56 2006
@@ -1,142 +1,142 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.journal.active;
-
-import org.apache.activeio.packet.Packet;
-
-import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
-
-/**
- * This contains all the data needed to write and force a list of records to a
- * LogFile. The more records that can be cramed into a single BatchedWrite, the
- * higher throughput that can be achived by a write and force operation.
- * 
- * @version $Revision: 1.1 $
- */
-final public class BatchedWrite {
-
-    private final Packet packet;
-    public Throwable error;
-    private Location mark;
-    private boolean appendDisabled = false;
-    private boolean appendInProgress = false;
-    private CountDownLatch writeDoneCountDownLatch;
-
-    /**
-     * @param packet
-     */
-    public BatchedWrite(Packet packet) {
-        this.packet = packet;
-    }
-
-    /**
-     * @throws InterruptedException
-     * 
-     */
-    synchronized private void disableAppend() throws InterruptedException {
-        appendDisabled = true;
-        while (appendInProgress) {
-            wait();
-        }
-    }
-
-    /**
-     * @param packet2
-     * @param mark2
-     * @return
-     */
-    public boolean append(Record record, Location recordMark, boolean force) {
-
-        synchronized (this) {
-            if (appendDisabled)
-                return false;
-            appendInProgress = true;
-        }
-        
-        
-        if( force && writeDoneCountDownLatch==null)
-            writeDoneCountDownLatch = new CountDownLatch(1);
-        
-        record.read(packet);
-
-        // if we fit the record in this batch
-        if ( !record.hasRemaining() ) {
-            if (recordMark != null)
-                mark = recordMark;
-        }
-
-        synchronized (this) {
-            appendInProgress = false;
-            this.notify();
-
-            if (appendDisabled)
-                return false;
-            else
-                return packet.remaining() > 0;
-        }
-    }
-
-    public void waitForForce() throws Throwable {
-        if( writeDoneCountDownLatch!=null ) {
-            writeDoneCountDownLatch.await();
-            synchronized (this) {
-                if (error != null)
-                    throw error;
-            }
-        }
-    }
-
-    public void forced() {
-        if( writeDoneCountDownLatch!=null ) {
-            writeDoneCountDownLatch.countDown();
-        }
-    }
-
-    public void writeFailed(Throwable error) {
-        if( writeDoneCountDownLatch!=null ) {
-            synchronized (this) {
-                this.error = error;
-            }
-            writeDoneCountDownLatch.countDown();
-        }
-    }
-
-    public Packet getPacket() {
-        return packet;
-    }
-
-    /**
-     * @return
-     */
-    public Location getMark() {
-        return mark;
-    }
-
-    /**
-     * @throws InterruptedException
-     * 
-     */
-    public void flip() throws InterruptedException {
-        disableAppend();
-        packet.flip();
-    }
-
-    public boolean getForce() {
-        return writeDoneCountDownLatch!=null;
-    }
-
-}
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.journal.active;
+
+import org.apache.activeio.packet.Packet;
+
+import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
+
+/**
+ * This contains all the data needed to write and force a list of records to a
+ * LogFile. The more records that can be cramed into a single BatchedWrite, the
+ * higher throughput that can be achived by a write and force operation.
+ * 
+ * @version $Revision: 1.1 $
+ */
+final public class BatchedWrite {
+
+    private final Packet packet;
+    public Throwable error;
+    private Location mark;
+    private boolean appendDisabled = false;
+    private boolean appendInProgress = false;
+    private CountDownLatch writeDoneCountDownLatch;
+
+    /**
+     * @param packet
+     */
+    public BatchedWrite(Packet packet) {
+        this.packet = packet;
+    }
+
+    /**
+     * @throws InterruptedException
+     * 
+     */
+    synchronized private void disableAppend() throws InterruptedException {
+        appendDisabled = true;
+        while (appendInProgress) {
+            wait();
+        }
+    }
+
+    /**
+     * @param packet2
+     * @param mark2
+     * @return
+     */
+    public boolean append(Record record, Location recordMark, boolean force) {
+
+        synchronized (this) {
+            if (appendDisabled)
+                return false;
+            appendInProgress = true;
+        }
+        
+        
+        if( force && writeDoneCountDownLatch==null)
+            writeDoneCountDownLatch = new CountDownLatch(1);
+        
+        record.read(packet);
+
+        // if we fit the record in this batch
+        if ( !record.hasRemaining() ) {
+            if (recordMark != null)
+                mark = recordMark;
+        }
+
+        synchronized (this) {
+            appendInProgress = false;
+            this.notify();
+
+            if (appendDisabled)
+                return false;
+            else
+                return packet.remaining() > 0;
+        }
+    }
+
+    public void waitForForce() throws Throwable {
+        if( writeDoneCountDownLatch!=null ) {
+            writeDoneCountDownLatch.await();
+            synchronized (this) {
+                if (error != null)
+                    throw error;
+            }
+        }
+    }
+
+    public void forced() {
+        if( writeDoneCountDownLatch!=null ) {
+            writeDoneCountDownLatch.countDown();
+        }
+    }
+
+    public void writeFailed(Throwable error) {
+        if( writeDoneCountDownLatch!=null ) {
+            synchronized (this) {
+                this.error = error;
+            }
+            writeDoneCountDownLatch.countDown();
+        }
+    }
+
+    public Packet getPacket() {
+        return packet;
+    }
+
+    /**
+     * @return
+     */
+    public Location getMark() {
+        return mark;
+    }
+
+    /**
+     * @throws InterruptedException
+     * 
+     */
+    public void flip() throws InterruptedException {
+        disableAppend();
+        packet.flip();
+    }
+
+    public boolean getForce() {
+        return writeDoneCountDownLatch!=null;
+    }
+
+}

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/active/BatchedWrite.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message