activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r734779 - in /activemq/activemq-blaze/trunk/src: main/java/org/apache/activeblaze/ main/java/org/apache/activeblaze/impl/network/ main/java/org/apache/activeblaze/impl/processor/ main/java/org/apache/activeblaze/impl/reliable/ main/java/org...
Date Thu, 15 Jan 2009 18:45:31 GMT
Author: rajdavies
Date: Thu Jan 15 10:45:30 2009
New Revision: 734779

URL: http://svn.apache.org/viewvc?rev=734779&view=rev
Log:
Added network concept - a logical group of channels to broadcast too

Added:
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java
  (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java
  (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java
  (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/package.html
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/DefaultChainedProcessor.java
      - copied, changed from r734428, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java
Removed:
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java
Modified:
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/CompressionProcessor.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableFactory.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/flow/SimpleFlow.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/ThreadChainedProcessor.java
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/TerminatedChainedProcessor.java

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java?rev=734779&r1=734778&r2=734779&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
(original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
Thu Jan 15 10:45:30 2009
@@ -22,7 +22,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.activeblaze.impl.destination.DestinationMatch;
-import org.apache.activeblaze.impl.processor.ChainedProcessor;
+import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
 import org.apache.activeblaze.impl.processor.CompressionProcessor;
 import org.apache.activeblaze.impl.processor.FragmentationProcessor;
 import org.apache.activeblaze.impl.processor.Packet;
@@ -44,7 +44,7 @@
  * 
  * 
  */
-public class BlazeChannelImpl extends ChainedProcessor implements BlazeChannel, ExceptionListener
{
+public class BlazeChannelImpl extends DefaultChainedProcessor implements BlazeChannel, ExceptionListener
{
     protected Map<Buffer, BlazeTopicListener> topicessageListenerMap = new ConcurrentHashMap<Buffer,
BlazeTopicListener>();
     protected final IdGenerator idGenerator = new IdGenerator();
     protected Buffer producerId;
@@ -125,7 +125,7 @@
         FragmentationProcessor fp = new FragmentationProcessor();
         fp.setMaxPacketSize(maxPacketSize);
         result.setEnd(fp);
-        ChainedProcessor reliable = ReliableFactory.get(getConfiguration().getReliable());
+        DefaultChainedProcessor reliable = ReliableFactory.get(getConfiguration().getReliable());
         result.setEnd(reliable);
         result.setEnd(transport);
         return result;

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java?rev=734779&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java
(added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java
Thu Jan 15 10:45:30 2009
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.network;
+
+import java.net.URI;
+import org.apache.activeblaze.ExceptionListener;
+import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
+import org.apache.activeblaze.impl.processor.Packet;
+import org.apache.activeblaze.impl.transport.BaseTransport;
+import org.apache.activeblaze.impl.transport.TransportFactory;
+
+/**
+ * Uses multicast to implement a Network
+ *
+ */
+public class MulticastNetwork  extends DefaultChainedProcessor implements Network, ExceptionListener{
+    
+    private URI uri;
+    private URI managementURI;
+    private BaseTransport broadcast;
+    private BaseTransport management;
+    private String name = "";
+    
+    /**
+     * @return the name
+     */
+    public String getName() {
+        return this.name;
+    }
+    /**
+     * @param name the name to set
+     */
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    /**
+     * @param uri
+     * @see org.apache.activeblaze.impl.network.Network#setManagementURI(java.net.URI)
+     */
+    public void setManagementURI(URI uri) {
+     this.managementURI=uri;
+     
+        
+    }
+
+    /**
+     * @param uri
+     * @see org.apache.activeblaze.impl.network.Network#setURI(java.net.URI)
+     */
+    public void setURI(URI uri) {
+        this.uri=uri;
+        
+    }
+
+    
+   
+    /**
+     * @return true if initialized
+     * @throws Exception
+     * @see org.apache.activeblaze.Service#init()
+     */
+    public boolean init() throws Exception {
+        boolean result = super.init();
+        if (result) {
+            this.broadcast  = TransportFactory.get(this.uri);
+            this.broadcast.setName(getName() + "-Broadcast");
+            this.broadcast.setExceptionListener(this);
+            this.broadcast.init();
+            if(this.managementURI != null && !this.managementURI.equals(this.uri)){
+                this.management =  TransportFactory.get(this.managementURI);
+                this.management.setName(getName() + "-Management");
+                this.management.setExceptionListener(this);
+                this.management.init();
+                
+            }
+        }
+        return result;
+    }
+
+  
+    /**
+     * @return true if shutDown
+     * @throws Exception
+     * @see org.apache.activeblaze.Service#shutDown()
+     */
+    public boolean shutDown() throws Exception {
+        boolean result = super.shutDown();
+        if (this.broadcast!=null) {
+            this.broadcast.shutDown();
+        }
+        if (this.management!=null) {
+            this.management.shutDown();
+        }
+        return result;
+    }
+
+    /**
+     * @return true if started
+     * @throws Exception
+     * @see org.apache.activeblaze.Service#start()
+     */
+    public boolean start() throws Exception {
+        boolean result = super.start();
+        if (this.broadcast!=null) {
+            this.broadcast.start();
+        }
+        if (this.management!=null) {
+            this.management.start();
+        }
+        return result;
+    }
+
+    /**
+     * @return true if stopped
+     * @throws Exception
+     * @see org.apache.activeblaze.Service#stop()
+     */
+    public boolean stop() throws Exception {
+        boolean result = super.stop();
+        if (this.broadcast!=null) {
+            this.broadcast.stop();
+        }
+        if (this.management!=null) {
+            this.management.stop();
+        }
+        return result;
+    }
+    
+    
+    /**
+     * @param packet
+     * @throws Exception
+     * @see org.apache.activeblaze.impl.network.Network#downStreamManagement(org.apache.activeblaze.impl.processor.Packet)
+     */
+    public void downStreamManagement(Packet packet) throws Exception {
+        if (this.management != null) {
+            this.management.downStream(packet);
+        }else {
+            this.broadcast.downStream(packet);
+        }
+        
+    }
+
+    /**
+     * @param packet
+     * @throws Exception
+     * @see org.apache.activeblaze.Processor#downStream(org.apache.activeblaze.impl.processor.Packet)
+     */
+    public void downStream(Packet packet) throws Exception {
+        this.broadcast.downStream(packet);
+        
+    }
+
+    /**
+     * @param l
+     * @see org.apache.activeblaze.Processor#setExceptionListener(org.apache.activeblaze.ExceptionListener)
+     */
+    public void setExceptionListener(ExceptionListener l) {
+        // TODO Auto-generated method stub
+        
+    }
+    /**
+     * @param ex
+     * @see org.apache.activeblaze.ExceptionListener#onException(java.lang.Exception)
+     */
+    public void onException(Exception ex) {
+        if (this.exceptionListener!=null) {
+            this.exceptionListener.onException(ex);
+        }
+        
+    }
+
+    
+    
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java?rev=734779&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java
(added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java
Thu Jan 15 10:45:30 2009
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.network;
+import java.net.URI;
+import org.apache.activeblaze.Service;
+import org.apache.activeblaze.Processor;
+import org.apache.activeblaze.impl.processor.Packet;
+
+/**
+ * <P>
+ * A <CODE>Network</CODE> defines operations that can be applied to remote
+ * channel instances
+ * 
+ */
+public interface Network extends Processor, Service {
+    
+    /**
+     * @return the name
+     */
+    public String getName() ;
+    /**
+     * @param name the name to set
+     */
+    public void setName(String name);
+    
+    /**
+     * Set the uri for the <Code>Network</Code> to use
+     * @param uri
+     */
+    public void setURI(URI uri);
+    
+    /**
+     * Set the uri for the <Code>Network</Code> to use for management
+     * @param uri
+     */
+    public void setManagementURI(URI uri);
+    
+        
+    /**
+     * Send a management packet - this may be on a different address
+     * @param packet
+     * @throws Exception
+     */
+    public void downStreamManagement(Packet packet) throws Exception;
+    
+    
+
+}
\ No newline at end of file

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java?rev=734779&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java
(added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java
Thu Jan 15 10:45:30 2009
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.network;
+
+import java.net.URI;
+
+/**
+ * create a new Network instance
+ *
+ */
+public class NetworkFactory {
+    
+    /**
+     * @param location
+     * @return the network associated with the URI
+     * @throws Exception
+     */
+    public static Network get(URI location) throws Exception {
+        Network result = null;
+        String scheme = location.getScheme();
+        scheme = scheme.trim();
+        if (scheme.equalsIgnoreCase("mcast") || scheme.equalsIgnoreCase("multicast")){
+            result = new MulticastNetwork();
+        }
+        return result;
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/package.html
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/package.html?rev=734779&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/package.html
(added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/package.html
Thu Jan 15 10:45:30 2009
@@ -0,0 +1,27 @@
+!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+A Transport that represents all reachable nodes. 
+A <Code>Network</Code>Can be a multicast address, defined by a list or urls
+or use a central location service to determine where to locate channels
+
+</body>
+</html>
\ No newline at end of file

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/CompressionProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/CompressionProcessor.java?rev=734779&r1=734778&r2=734779&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/CompressionProcessor.java
(original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/CompressionProcessor.java
Thu Jan 15 10:45:30 2009
@@ -29,7 +29,7 @@
  * Compresses PacketData
  * 
  */
-public class CompressionProcessor extends ChainedProcessor {
+public class CompressionProcessor extends DefaultChainedProcessor {
     private int compressionLimit = 8192;
     private int compressionLevel = Deflater.BEST_COMPRESSION;
     private class CompressionStream extends GZIPOutputStream {

Copied: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/DefaultChainedProcessor.java
(from r734428, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/DefaultChainedProcessor.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/DefaultChainedProcessor.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java&r1=734428&r2=734779&rev=734779&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java
(original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/DefaultChainedProcessor.java
Thu Jan 15 10:45:30 2009
@@ -27,13 +27,13 @@
  * Chains Processors together
  * 
  */
-public class ChainedProcessor extends BaseService implements Processor {
-    private static final Log LOG = LogFactory.getLog(ChainedProcessor.class);
+public class DefaultChainedProcessor extends BaseService implements Processor {
+    private static final Log LOG = LogFactory.getLog(DefaultChainedProcessor.class);
     private Processor next;
     private Processor prev;
     protected ExceptionListener exceptionListener;
 
-    protected ChainedProcessor() {
+    protected DefaultChainedProcessor() {
     }
 
     /**
@@ -50,17 +50,17 @@
      * 
      */
     public void setEnd(Processor next) {
-        ChainedProcessor target = this;
+        DefaultChainedProcessor target = this;
         Processor n = getNext();
         while (n != null) {
-            if (n instanceof ChainedProcessor) {
-                ChainedProcessor cn = (ChainedProcessor) n;
+            if (n instanceof DefaultChainedProcessor) {
+                DefaultChainedProcessor cn = (DefaultChainedProcessor) n;
                 target = cn;
                 n = cn.getNext();
             }
         }
-        if (next instanceof ChainedProcessor) {
-            target.setNextChain((ChainedProcessor) next);
+        if (next instanceof DefaultChainedProcessor) {
+            target.setNextChain((DefaultChainedProcessor) next);
         } else {
             target.next = next;
         }
@@ -87,12 +87,12 @@
      * 
      * @param p
      */
-    public void setNextChain(ChainedProcessor p) {
-        ChainedProcessor target = this;
+    public void setNextChain(DefaultChainedProcessor p) {
+        DefaultChainedProcessor target = this;
         Processor n = getNext();
         while (n != null) {
-            if (n instanceof ChainedProcessor) {
-                ChainedProcessor cn = (ChainedProcessor) n;
+            if (n instanceof DefaultChainedProcessor) {
+                DefaultChainedProcessor cn = (DefaultChainedProcessor) n;
                 target = cn;
                 n = cn.getNext();
             }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java?rev=734779&r1=734778&r2=734779&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java
(original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java
Thu Jan 15 10:45:30 2009
@@ -30,7 +30,7 @@
  */
 
 @SuppressWarnings("serial")
-public class FragmentationProcessor extends ChainedProcessor {
+public class FragmentationProcessor extends DefaultChainedProcessor {
     private static final Log LOG = LogFactory.getLog(FragmentationProcessor.class);
     private int maxPacketSize = BlazeConfiguration.DEFAULT_MAX_PACKET_SIZE;
     private int maxCacheSize = 16 * 1024;

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableFactory.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableFactory.java?rev=734779&r1=734778&r2=734779&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableFactory.java
(original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableFactory.java
Thu Jan 15 10:45:30 2009
@@ -17,7 +17,7 @@
 package org.apache.activeblaze.impl.reliable;
 
 import java.util.Map;
-import org.apache.activeblaze.impl.processor.ChainedProcessor;
+import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
 import org.apache.activeblaze.util.ObjectFinder;
 import org.apache.activeblaze.util.PropertyUtil;
 
@@ -34,23 +34,23 @@
      * @return the configured transport from its URI
      * @throws Exception
      */
-    public static ChainedProcessor get(String location) throws Exception {
-        ChainedProcessor result  = findReliable(location);
+    public static DefaultChainedProcessor get(String location) throws Exception {
+        DefaultChainedProcessor result  = findReliable(location);
         configure(result, location);
         return result;
     }
     
-    static void configure(ChainedProcessor transport, String location) throws Exception {
+    static void configure(DefaultChainedProcessor transport, String location) throws Exception
{
         Map<String, String> options = PropertyUtil.parseParameters(location);
         PropertyUtil.setProperties(transport, options);
     }
     
-    private static ChainedProcessor findReliable(String location) throws Exception {
+    private static DefaultChainedProcessor findReliable(String location) throws Exception
{
     String scheme = PropertyUtil.stripBefore(location, '?');
     if (scheme == null) {
         throw new IllegalArgumentException("Reliability scheme not specified: [" + location
+ "]");
     }
-    ChainedProcessor result = (ChainedProcessor) OBJECT_FINDER.newInstance(scheme);
+    DefaultChainedProcessor result = (DefaultChainedProcessor) OBJECT_FINDER.newInstance(scheme);
     return result;
     }
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/flow/SimpleFlow.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/flow/SimpleFlow.java?rev=734779&r1=734778&r2=734779&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/flow/SimpleFlow.java
(original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/flow/SimpleFlow.java
Thu Jan 15 10:45:30 2009
@@ -16,14 +16,14 @@
  */
 package org.apache.activeblaze.impl.reliable.flow;
 
-import org.apache.activeblaze.impl.processor.ChainedProcessor;
+import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
 import org.apache.activeblaze.impl.processor.Packet;
 
 /**
  * Simple FlowControl
  * 
  */
-public class SimpleFlow extends ChainedProcessor {
+public class SimpleFlow extends DefaultChainedProcessor {
     int maxWindowSize = 4 * 1024;
     int windowSize = 0;
     int pauseTime = 2;

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java?rev=734779&r1=734778&r2=734779&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java
(original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java
Thu Jan 15 10:45:30 2009
@@ -16,14 +16,14 @@
  */
 package org.apache.activeblaze.impl.reliable.simple;
 
-import org.apache.activeblaze.impl.processor.ChainedProcessor;
+import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
 import org.apache.activeblaze.impl.reliable.flow.SimpleFlow;
 
 /**
  * Very basic (none) reliability
  *
  */
-public class SimpleReliableProcessor extends ChainedProcessor{
+public class SimpleReliableProcessor extends DefaultChainedProcessor{
     
    private SimpleFlow simpleFlow;
    

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/ThreadChainedProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/ThreadChainedProcessor.java?rev=734779&r1=734778&r2=734779&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/ThreadChainedProcessor.java
(original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/ThreadChainedProcessor.java
Thu Jan 15 10:45:30 2009
@@ -17,7 +17,7 @@
 package org.apache.activeblaze.impl.transport;
 
 import java.net.SocketTimeoutException;
-import org.apache.activeblaze.impl.processor.ChainedProcessor;
+import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -25,7 +25,7 @@
  * Thread associated with processing
  *
  */
-public abstract class ThreadChainedProcessor extends ChainedProcessor implements Runnable
{
+public abstract class ThreadChainedProcessor extends DefaultChainedProcessor implements Runnable
{
     private static final Log LOG = LogFactory.getLog(ThreadChainedProcessor.class);
     private int priority=Thread.NORM_PRIORITY;
     private boolean daemon;

Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java?rev=734779&r1=734778&r2=734779&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java
(original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java
Thu Jan 15 10:45:30 2009
@@ -27,13 +27,13 @@
 public class ChainedProcessorTest extends TestCase {
     public void testStart() throws Exception {
         final AtomicBoolean test = new AtomicBoolean();
-        ChainedProcessor target = new ChainedProcessor() {
+        DefaultChainedProcessor target = new DefaultChainedProcessor() {
             public boolean start() {
                 return test.getAndSet(true);
             }
         };
-        ChainedProcessor A = new ChainedProcessor();
-        ChainedProcessor B = new ChainedProcessor();
+        DefaultChainedProcessor A = new DefaultChainedProcessor();
+        DefaultChainedProcessor B = new DefaultChainedProcessor();
         A.setNext(B);
         A.setEnd(target);
         A.start();
@@ -42,13 +42,13 @@
 
     public void testStop() throws Exception {
         final AtomicBoolean test = new AtomicBoolean();
-        ChainedProcessor target = new ChainedProcessor() {
+        DefaultChainedProcessor target = new DefaultChainedProcessor() {
             public boolean stop() {
                 return test.getAndSet(true);
             }
         };
-        ChainedProcessor A = new ChainedProcessor();
-        ChainedProcessor B = new ChainedProcessor();
+        DefaultChainedProcessor A = new DefaultChainedProcessor();
+        DefaultChainedProcessor B = new DefaultChainedProcessor();
         A.setNext(B);
         A.setEnd(target);
         A.start();
@@ -58,15 +58,15 @@
 
     public void testDownStream() throws Exception {
         final AtomicBoolean test = new AtomicBoolean();
-        ChainedProcessor target = new ChainedProcessor() {
+        DefaultChainedProcessor target = new DefaultChainedProcessor() {
             public void downStream(Packet p) {
                 test.set(true);
             }
         };
-        ChainedProcessor A = new ChainedProcessor();
-        ChainedProcessor B = new ChainedProcessor();
-        ChainedProcessor C = new ChainedProcessor();
-        ChainedProcessor D = new ChainedProcessor();
+        DefaultChainedProcessor A = new DefaultChainedProcessor();
+        DefaultChainedProcessor B = new DefaultChainedProcessor();
+        DefaultChainedProcessor C = new DefaultChainedProcessor();
+        DefaultChainedProcessor D = new DefaultChainedProcessor();
         A.setEnd(B);
         A.setEnd(C);
         A.setEnd(D);
@@ -79,15 +79,15 @@
 
     public void testUpStream() throws Exception {
         final AtomicBoolean test = new AtomicBoolean();
-        ChainedProcessor target = new ChainedProcessor() {
+        DefaultChainedProcessor target = new DefaultChainedProcessor() {
             public void upStream(Packet p) {
                 test.set(true);
             }
         };
-        ChainedProcessor A = new ChainedProcessor();
-        ChainedProcessor B = new ChainedProcessor();
-        ChainedProcessor C = new ChainedProcessor();
-        ChainedProcessor D = new ChainedProcessor();
+        DefaultChainedProcessor A = new DefaultChainedProcessor();
+        DefaultChainedProcessor B = new DefaultChainedProcessor();
+        DefaultChainedProcessor C = new DefaultChainedProcessor();
+        DefaultChainedProcessor D = new DefaultChainedProcessor();
         target.setEnd(A);
         A.setEnd(B);
         A.setEnd(C);

Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/TerminatedChainedProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/TerminatedChainedProcessor.java?rev=734779&r1=734778&r2=734779&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/TerminatedChainedProcessor.java
(original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/TerminatedChainedProcessor.java
Thu Jan 15 10:45:30 2009
@@ -18,7 +18,7 @@
 
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.activeblaze.impl.processor.ChainedProcessor;
+import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
 import org.apache.activeblaze.impl.processor.Packet;
 
 
@@ -26,7 +26,7 @@
  * Test Processor
  *
  */
-public class TerminatedChainedProcessor extends ChainedProcessor {
+public class TerminatedChainedProcessor extends DefaultChainedProcessor {
 
     private Packet result = null;
     private List<Packet> list = new ArrayList<Packet>();



Mime
View raw message