geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bschucha...@apache.org
Subject [12/51] [partial] incubator-geode git commit: GEODE-77 removing the old jgroups subproject
Date Fri, 21 Aug 2015 21:22:36 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/obsolete/UDP.java.txt
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/obsolete/UDP.java.txt b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/obsolete/UDP.java.txt
deleted file mode 100644
index d5f8f63..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/obsolete/UDP.java.txt
+++ /dev/null
@@ -1,1921 +0,0 @@
-// $Id: UDP.java.txt,v 1.1 2005/06/24 11:19:28 belaban Exp $
-
-package org.jgroups.protocols;
-
-
-import org.jgroups.*;
-import org.jgroups.stack.IpAddress;
-import org.jgroups.stack.Protocol;
-import org.jgroups.util.List;
-import org.jgroups.util.*;
-import org.jgroups.util.Queue;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-
-
-
-
-/**
- * IP multicast transport based on UDP. Messages to the group (msg.dest == null) will
- * be multicast (to all group members), whereas point-to-point messages
- * (msg.dest != null) will be unicast to a single member. Uses a multicast and
- * a unicast socket.<p>
- * The following properties are being read by the UDP protocol<p>
- * param mcast_addr - the multicast address to use default is 228.8.8.8<br>
- * param mcast_port - (int) the port that the multicast is sent on default is 7600<br>
- * param ip_mcast - (boolean) flag whether to use IP multicast - default is true<br>
- * param ip_ttl - Set the default time-to-live for multicast packets sent out on this
- * socket. default is 32<br>
- * param use_packet_handler - If set, the mcast and ucast receiver threads just put
- * the datagram's payload (a byte buffer) into a queue, from where a separate thread
- * will dequeue and handle them (unmarshal and pass up). This frees the receiver
- * threads from having to do message unmarshalling; this time can now be spent
- * receiving packets. If you have lots of retransmissions because of network
- * input buffer overflow, consider setting this property to true (default is false).
- * @author Bela Ban
- */
-public class UDP extends Protocol implements Runnable {
-
-    /** Socket used for
-     * <ol>
-     * <li>sending unicast packets and
-     * <li>receiving unicast packets
-     * </ol>
-     * The address of this socket will be our local address (<tt>local_addr</tt>) */
-    DatagramSocket  sock=null;
-
-    /**
-     * BoundedList<Integer> of the last 100 ports used. This is to avoid reusing a port for DatagramSocket
-     */
-    private static BoundedList last_ports_used=null;
-
-    /** Maintain a list of local ports opened by DatagramSocket. If this is 0, this option is turned off.
-     * If bind_port is null, then this options will be ignored */
-    int             num_last_ports=100;
-
-    /** IP multicast socket for <em>receiving</em> multicast packets */
-    MulticastSocket mcast_recv_sock=null;
-
-    /** IP multicast socket for <em>sending</em> multicast packets */
-    MulticastSocket mcast_send_sock=null;
-
-    /**
-     * Traffic class for sending unicast and multicast datagrams.
-     * Valid values are (check {@link #DatagramSocket.setTrafficClass(int)}  for details):
-     * <UL>
-     * <LI><CODE>IPTOS_LOWCOST (0x02)</CODE>, <b>decimal 2</b></LI>
-     * <LI><CODE>IPTOS_RELIABILITY (0x04)</CODE><, <b>decimal 4</b>/LI>
-     * <LI><CODE>IPTOS_THROUGHPUT (0x08)</CODE>, <b>decimal 8</b></LI>
-     * <LI><CODE>IPTOS_LOWDELAY (0x10)</CODE>, <b>decimal</b> 16</LI>
-     * </UL>
-     */
-    int             tos=0; // valid values: 2, 4, 8, 16
-
-    /** The address (host and port) of this member */
-    IpAddress       local_addr=null;
-
-    /** The name of the group to which this member is connected */
-    String          channel_name=null;
-
-    UdpHeader       udp_hdr=null;
-
-    /** The multicast address (mcast address and port) this member uses */
-    IpAddress       mcast_addr=null;
-
-    /** The interface (NIC) to which the unicast and multicast sockets bind */
-    InetAddress     bind_addr=null;
-
-    /** Bind the receiver multicast socket to all available interfaces (requires JDK 1.4) */
-    boolean         bind_to_all_interfaces=false;
-
-    /** The port to which the unicast receiver socket binds.
-     * 0 means to bind to any (ephemeral) port */
-    int             bind_port=0;
-	int				port_range=1; // 27-6-2003 bgooren, Only try one port by default
-
-    /** The multicast address used for sending and receiving packets */
-    String          mcast_addr_name="228.8.8.8";
-
-    /** The multicast port used for sending and receiving packets */
-    int             mcast_port=7600;
-
-    /** The multicast receiver thread */
-    Thread          mcast_receiver=null;
-
-    /** The unicast receiver thread */
-    UcastReceiver   ucast_receiver=null;
-
-    /** Whether to enable IP multicasting. If false, multiple unicast datagram
-     * packets are sent rather than one multicast packet */
-    boolean         ip_mcast=true;
-
-    /** The time-to-live (TTL) for multicast datagram packets */
-    int             ip_ttl=64;
-
-    /** The members of this group (updated when a member joins or leaves) */
-    final Vector    members=new Vector(11);
-
-    /** Pre-allocated byte stream. Used for serializing datagram packets. Will grow as needed */
-    final ExposedByteArrayOutputStream out_stream=new ExposedByteArrayOutputStream(1024);
-
-    /** Send buffer size of the multicast datagram socket */
-    int             mcast_send_buf_size=32000;
-
-    /** Receive buffer size of the multicast datagram socket */
-    int             mcast_recv_buf_size=64000;
-
-    /** Send buffer size of the unicast datagram socket */
-    int             ucast_send_buf_size=32000;
-
-    /** Receive buffer size of the unicast datagram socket */
-    int             ucast_recv_buf_size=64000;
-
-    /** If true, messages sent to self are treated specially: unicast messages are
-     * looped back immediately, multicast messages get a local copy first and -
-     * when the real copy arrives - it will be discarded. Useful for Window
-     * media (non)sense */
-    boolean         loopback=true;
-
-
-    /** Discard packets with a different version. Usually minor version differences are okay. Setting this property
-     * to true means that we expect the exact same version on all incoming packets */
-    boolean         discard_incompatible_packets=false;
-
-    /** Sometimes receivers are overloaded (they have to handle de-serialization etc).
-     * Packet handler is a separate thread taking care of de-serialization, receiver
-     * thread(s) simply put packet in queue and return immediately. Setting this to
-     * true adds one more thread */
-    boolean         use_incoming_packet_handler=false;
-
-    /** Used by packet handler to store incoming DatagramPackets */
-    Queue           incoming_queue=null;
-
-    /** Dequeues DatagramPackets from packet_queue, unmarshalls them and
-     * calls <tt>handleIncomingUdpPacket()</tt> */
-    IncomingPacketHandler   incoming_packet_handler=null;
-
-    /** Packets to be sent are stored in outgoing_queue and sent by a separate thread. Enabling this
-     * value uses an additional thread */
-    boolean         use_outgoing_packet_handler=false;
-
-    /** Used by packet handler to store outgoing DatagramPackets */
-    Queue           outgoing_queue=null;
-
-    OutgoingPacketHandler outgoing_packet_handler=null;
-
-    /** If set it will be added to <tt>local_addr</tt>. Used to implement
-     * for example transport independent addresses */
-    byte[]          additional_data=null;
-
-    /** Maximum number of bytes for messages to be queued until they are sent. This value needs to be smaller
-        than the largest UDP datagram packet size */
-    int max_bundle_size=AUTOCONF.senseMaxFragSizeStatic();
-
-    /** Max number of milliseconds until queued messages are sent. Messages are sent when max_bundle_size or
-     * max_bundle_timeout has been exceeded (whichever occurs faster)
-     */
-    long max_bundle_timeout=20;
-
-    /** Enabled bundling of smaller messages into bigger ones */
-    boolean enable_bundling=false;
-
-    /** Used by BundlingOutgoingPacketHandler */
-    TimeScheduler timer=null;
-
-    /** HashMap<Address, Address>. Keys=senders, values=destinations. For each incoming message M with sender S, adds
-     * an entry with key=S and value= sender's IP address and port.
-     */
-    HashMap addr_translation_table=new HashMap();
-
-    boolean use_addr_translation=false;
-
-    /** The name of this protocol */
-    static final String    name="UDP";
-
-    static final String IGNORE_BIND_ADDRESS_PROPERTY="ignore.bind.address";
-
-
-    /** Usually, src addresses are nulled, and the receiver simply sets them to the address of the sender. However,
-     * for multiple addresses on a Windows loopback device, this doesn't work
-     * (see http://jira.jboss.com/jira/browse/JGRP-79 and the JGroups wiki for details). This must be the same
-     * value for all members of the same group. Default is true, for performance reasons */
-    boolean null_src_addresses=true;
-
-    long num_msgs_sent=0, num_msgs_received=0, num_bytes_sent=0, num_bytes_received=0;
-
-
-
-    /**
-     * public constructor. creates the UDP protocol, and initializes the
-     * state variables, does however not start any sockets or threads
-     */
-    public UDP() {
-        ;
-    }
-
-    /**
-     * debug only
-     */
-    public String toString() {
-        return "UDP(local address: " + local_addr + ')';
-    }
-
-    public void resetStats() {
-        num_msgs_sent=num_msgs_received=num_bytes_sent=num_bytes_received=0;
-    }
-
-    private BoundedList getLastPortsUsed() {
-        if(last_ports_used == null)
-            last_ports_used=new BoundedList(num_last_ports);
-        return last_ports_used;
-    }
-
-    public long getNumMessagesSent()     {return num_msgs_sent;}
-    public long getNumMessagesReceived() {return num_msgs_received;}
-    public long getNumBytesSent()        {return num_bytes_sent;}
-    public long getNumBytesReceived()    {return num_bytes_received;}
-    public String getBindAddress() {return bind_addr != null? bind_addr.toString() : "null";}
-    public void setBindAddress(String bind_addr) throws UnknownHostException {
-        this.bind_addr=InetAddress.getByName(bind_addr);
-    }
-    public boolean getBindToAllInterfaces() {return bind_to_all_interfaces;}
-    public void setBindToAllInterfaces(boolean flag) {this.bind_to_all_interfaces=flag;}
-    public boolean isDiscardIncompatiblePackets() {return discard_incompatible_packets;}
-    public void setDiscardIncompatiblePackets(boolean flag) {discard_incompatible_packets=flag;}
-    public boolean isEnableBundling() {return enable_bundling;}
-    public void setEnableBundling(boolean flag) {enable_bundling=flag;}
-    public int getMaxBundleSize() {return max_bundle_size;}
-    public void setMaxBundleSize(int size) {max_bundle_size=size;}
-    public long getMaxBundleTimeout() {return max_bundle_timeout;}
-    public void setMaxBundleTimeout(long timeout) {max_bundle_timeout=timeout;}
-
-    /* ----------------------- Receiving of MCAST UDP packets ------------------------ */
-
-    public void run() {
-        DatagramPacket  packet;
-        byte            receive_buf[]=new byte[65535];
-        int             len, sender_port;
-        byte[]          tmp, data;
-        InetAddress     sender_addr;
-
-        // moved out of loop to avoid excessive object creations (bela March 8 2001)
-        packet=new DatagramPacket(receive_buf, receive_buf.length);
-
-        while(mcast_receiver != null && mcast_recv_sock != null) {
-            try {
-                packet.setData(receive_buf, 0, receive_buf.length);
-                mcast_recv_sock.receive(packet);
-                sender_addr=packet.getAddress();
-                sender_port=packet.getPort();
-                len=packet.getLength();
-                data=packet.getData();
-
-                if(len == 4) {  // received a diagnostics probe
-                    if(data[0] == 'd' && data[1] == 'i' && data[2] == 'a' && data[3] == 'g') {
-                        handleDiagnosticProbe(sender_addr, sender_port);
-                        continue;
-                    }
-                }
-
-                if(log.isTraceEnabled()){
-                    StringBuffer sb=new StringBuffer("received (mcast) ");
-                    sb.append(len).append(" bytes from ").append(sender_addr).append(':');
-                    sb.append(sender_port).append(" (size=").append(len).append(" bytes)");
-                    log.trace(sb.toString());
-                }
-                if(len > receive_buf.length) {
-                    if(log.isErrorEnabled()) log.error("size of the received packet (" + len + ") is bigger than " +
-                                             "allocated buffer (" + receive_buf.length + "): will not be able to handle packet. " +
-                                             "Use the FRAG protocol and make its frag_size lower than " + receive_buf.length);
-                }
-
-                if(use_incoming_packet_handler) {
-                    tmp=new byte[len];
-                    System.arraycopy(data, 0, tmp, 0, len);
-                    incoming_queue.add(new IncomingQueueEntry(mcast_addr, sender_addr, sender_port, tmp));
-                }
-                else
-                    handleIncomingUdpPacket(mcast_addr, sender_addr, sender_port, data);
-            }
-            catch(SocketException sock_ex) {
-                 if(log.isTraceEnabled()) log.trace("multicast socket is closed, exception=" + sock_ex);
-                break;
-            }
-            catch(InterruptedIOException io_ex) { // thread was interrupted
-                ; // go back to top of loop, where we will terminate loop
-            }
-            catch(Throwable ex) {
-                if(log.isErrorEnabled())
-                    log.error("failure in multicast receive()", ex);
-                Util.sleep(100); // so we don't get into 100% cpu spinning (should NEVER happen !)
-            }
-        }
-        if(log.isDebugEnabled()) log.debug("multicast thread terminated");
-    }
-
-
-    private void handleDiagnosticProbe(InetAddress sender, int port) {
-        try {
-            byte[] diag_rsp=getDiagResponse().getBytes();
-            DatagramPacket rsp=new DatagramPacket(diag_rsp, 0, diag_rsp.length, sender, port);
-            if(log.isDebugEnabled()) log.debug("sending diag response to " + sender + ':' + port);
-            sock.send(rsp);
-        }
-        catch(Throwable t) {
-            if(log.isErrorEnabled()) log.error("failed sending diag rsp to " + sender + ':' + port +
-                                                       ", exception=" + t);
-        }
-    }
-
-    private String getDiagResponse() {
-        StringBuffer sb=new StringBuffer();
-        sb.append(local_addr).append(" (").append(channel_name).append(')');
-        sb.append(" [").append(mcast_addr_name).append(':').append(mcast_port).append("]\n");
-        sb.append("Version=").append(Version.description).append(", cvs=\"").append(Version.cvs).append("\"\n");
-        sb.append("bound to ").append(bind_addr).append(':').append(bind_port).append('\n');
-        sb.append("members: ").append(members).append('\n');
-
-        return sb.toString();
-    }
-
-    /* ------------------------------------------------------------------------------- */
-
-
-
-    /*------------------------------ Protocol interface ------------------------------ */
-
-    public String getName() {
-        return name;
-    }
-
-
-    public void init() throws Exception {
-        if(use_incoming_packet_handler) {
-            incoming_queue=new Queue();
-            incoming_packet_handler=new IncomingPacketHandler();
-        }
-        if(use_outgoing_packet_handler) {
-            outgoing_queue=new Queue();
-            if(enable_bundling) {
-                timer=stack != null? stack.timer : null;
-                if(timer == null)
-                    throw new Exception("timer could not be retrieved");
-                outgoing_packet_handler=new BundlingOutgoingPacketHandler();
-            }
-            else
-                outgoing_packet_handler=new OutgoingPacketHandler();
-        }
-    }
-
-
-    /**
-     * Creates the unicast and multicast sockets and starts the unicast and multicast receiver threads
-     */
-    public void start() throws Exception {
-        if(log.isDebugEnabled()) log.debug("creating sockets and starting threads");
-        createSockets();
-        passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr));
-        startThreads();
-    }
-
-
-    public void stop() {
-        if(log.isDebugEnabled()) log.debug("closing sockets and stopping threads");
-        stopThreads();  // will close sockets, closeSockets() is not really needed anymore, but...
-        closeSockets(); // ... we'll leave it in there for now (doesn't do anything if already closed)
-    }
-
-
-
-    /**
-     * Setup the Protocol instance acording to the configuration string
-     * The following properties are being read by the UDP protocol
-     * param mcast_addr - the multicast address to use default is 228.8.8.8
-     * param mcast_port - (int) the port that the multicast is sent on default is 7600
-     * param ip_mcast - (boolean) flag whether to use IP multicast - default is true
-     * param ip_ttl - Set the default time-to-live for multicast packets sent out on this socket. default is 32
-     * @return true if no other properties are left.
-     *         false if the properties still have data in them, ie ,
-     *         properties are left over and not handled by the protocol stack
-     */
-    public boolean setProperties(Properties props) {
-        String str;
-        String tmp = null;
-
-        super.setProperties(props);
-        
-        // PropertyPermission not granted if running in an untrusted environment with JNLP.
-        try {
-            tmp=System.getProperty("bind.address");
-            if(Boolean.getBoolean(IGNORE_BIND_ADDRESS_PROPERTY)) {
-                tmp=null;
-            }
-        }
-        catch (SecurityException ex){
-        }
-        
-        if(tmp != null)
-            str=tmp;
-        else
-            str=props.getProperty("bind_addr");
-        if(str != null) {
-            try {
-                bind_addr=InetAddress.getByName(str);
-            }
-            catch(UnknownHostException unknown) {
-                if(log.isFatalEnabled()) log.fatal("(bind_addr): host " + str + " not known");
-                return false;
-            }
-            props.remove("bind_addr");
-        }
-
-        str=props.getProperty("bind_to_all_interfaces");
-        if(str != null) {
-            bind_to_all_interfaces=new Boolean(str).booleanValue();
-            props.remove("bind_to_all_interfaces");
-        }
-
-        str=props.getProperty("bind_port");
-        if(str != null) {
-            bind_port=Integer.parseInt(str);
-            props.remove("bind_port");
-        }
-
-        str=props.getProperty("num_last_ports");
-        if(str != null) {
-            num_last_ports=Integer.parseInt(str);
-            props.remove("num_last_ports");
-        }
-
-		str=props.getProperty("start_port");
-        if(str != null) {
-            bind_port=Integer.parseInt(str);
-            props.remove("start_port");
-        }
-
-		str=props.getProperty("port_range");
-        if(str != null) {
-            port_range=Integer.parseInt(str);
-            props.remove("port_range");
-        }
-
-        str=props.getProperty("mcast_addr");
-        if(str != null) {
-            mcast_addr_name=str;
-            props.remove("mcast_addr");
-        }
-
-        str=props.getProperty("mcast_port");
-        if(str != null) {
-            mcast_port=Integer.parseInt(str);
-            props.remove("mcast_port");
-        }
-
-        str=props.getProperty("ip_mcast");
-        if(str != null) {
-            ip_mcast=Boolean.valueOf(str).booleanValue();
-            props.remove("ip_mcast");
-        }
-
-        str=props.getProperty("ip_ttl");
-        if(str != null) {
-            ip_ttl=Integer.parseInt(str);
-            props.remove("ip_ttl");
-        }
-
-        str=props.getProperty("tos");
-        if(str != null) {
-            tos=Integer.parseInt(str);
-            props.remove("tos");
-        }
-
-        str=props.getProperty("mcast_send_buf_size");
-        if(str != null) {
-            mcast_send_buf_size=Integer.parseInt(str);
-            props.remove("mcast_send_buf_size");
-        }
-
-        str=props.getProperty("mcast_recv_buf_size");
-        if(str != null) {
-            mcast_recv_buf_size=Integer.parseInt(str);
-            props.remove("mcast_recv_buf_size");
-        }
-
-        str=props.getProperty("ucast_send_buf_size");
-        if(str != null) {
-            ucast_send_buf_size=Integer.parseInt(str);
-            props.remove("ucast_send_buf_size");
-        }
-
-        str=props.getProperty("ucast_recv_buf_size");
-        if(str != null) {
-            ucast_recv_buf_size=Integer.parseInt(str);
-            props.remove("ucast_recv_buf_size");
-        }
-
-        str=props.getProperty("loopback");
-        if(str != null) {
-            loopback=Boolean.valueOf(str).booleanValue();
-            props.remove("loopback");
-        }
-
-        str=props.getProperty("discard_incompatible_packets");
-        if(str != null) {
-            discard_incompatible_packets=Boolean.valueOf(str).booleanValue();
-            props.remove("discard_incompatible_packets");
-        }
-
-        // this is deprecated, just left for compatibility (use use_incoming_packet_handler)
-        str=props.getProperty("use_packet_handler");
-        if(str != null) {
-            use_incoming_packet_handler=Boolean.valueOf(str).booleanValue();
-            props.remove("use_packet_handler");
-            if(log.isWarnEnabled()) log.warn("'use_packet_handler' is deprecated; use 'use_incoming_packet_handler' instead");
-        }
-
-        str=props.getProperty("use_incoming_packet_handler");
-        if(str != null) {
-            use_incoming_packet_handler=Boolean.valueOf(str).booleanValue();
-            props.remove("use_incoming_packet_handler");
-        }
-
-        str=props.getProperty("use_outgoing_packet_handler");
-        if(str != null) {
-            use_outgoing_packet_handler=Boolean.valueOf(str).booleanValue();
-            props.remove("use_outgoing_packet_handler");
-        }
-
-        str=props.getProperty("max_bundle_size");
-        if(str != null) {
-            int bundle_size=Integer.parseInt(str);
-            if(bundle_size > max_bundle_size) {
-                if(log.isErrorEnabled()) log.error("max_bundle_size (" + bundle_size +
-                        ") is greater than largest UDP fragmentation size (" + max_bundle_size + ')');
-                return false;
-            }
-            if(bundle_size <= 0) {
-                if(log.isErrorEnabled()) log.error("max_bundle_size (" + bundle_size + ") is <= 0");
-                return false;
-            }
-            max_bundle_size=bundle_size;
-            props.remove("max_bundle_size");
-        }
-
-        str=props.getProperty("max_bundle_timeout");
-        if(str != null) {
-            max_bundle_timeout=Long.parseLong(str);
-            if(max_bundle_timeout <= 0) {
-                if(log.isErrorEnabled()) log.error("max_bundle_timeout of " + max_bundle_timeout + " is invalid");
-                return false;
-            }
-            props.remove("max_bundle_timeout");
-        }
-
-        str=props.getProperty("enable_bundling");
-        if(str != null) {
-            enable_bundling=Boolean.valueOf(str).booleanValue();
-            props.remove("enable_bundling");
-        }
-
-        str=props.getProperty("use_addr_translation");
-        if(str != null) {
-            use_addr_translation=Boolean.valueOf(str).booleanValue();
-            props.remove("use_addr_translation");
-        }
-
-        str=props.getProperty("null_src_addresses");
-        if(str != null) {
-            null_src_addresses=Boolean.valueOf(str).booleanValue();
-            props.remove("null_src_addresses");
-        }
-
-        if(props.size() > 0) {
-            log.error("UDP.setProperties(): the following properties are not recognized: " + props);
-
-            return false;
-        }
-
-        if(enable_bundling) {
-            if(use_outgoing_packet_handler == false)
-                if(log.isWarnEnabled()) log.warn("enable_bundling is true; setting use_outgoing_packet_handler=true");
-            use_outgoing_packet_handler=true;
-        }
-
-        return true;
-    }
-
-
-
-    /**
-     * DON'T REMOVE ! This prevents the up-handler thread to be created, which essentially is superfluous:
-     * messages are received from the network rather than from a layer below.
-     */
-    public void startUpHandler() {
-        ;
-    }
-
-    /**
-     * handle the UP event.
-     * @param evt - the event being send from the stack
-     */
-    public void up(Event evt) {
-
-        switch(evt.getType()) {
-
-            case Event.CONFIG:
-                passUp(evt);
-                 if(log.isDebugEnabled()) log.debug("received CONFIG event: " + evt.getArg());
-                handleConfigEvent((HashMap)evt.getArg());
-                return;
-        }
-
-        passUp(evt);
-    }
-
-    /**
-     * Caller by the layer above this layer. Usually we just put this Message
-     * into the send queue and let one or more worker threads handle it. A worker thread
-     * then removes the Message from the send queue, performs a conversion and adds the
-     * modified Message to the send queue of the layer below it, by calling Down).
-     */
-    public void down(Event evt) {
-        Message msg;
-        Object dest_addr;
-
-        if(evt.getType() != Event.MSG) {  // unless it is a message handle it and respond
-            handleDownEvent(evt);
-            return;
-        }
-
-        msg=(Message)evt.getArg();
-
-        if(channel_name != null) {
-            // added patch by Roland Kurmann (March 20 2003)
-            // msg.putHeader(name, new UdpHeader(channel_name));
-            msg.putHeader(name, udp_hdr);
-        }
-
-        dest_addr=msg.getDest();
-
-        // Because we don't call Protocol.passDown(), we notify the observer directly (e.g. PerfObserver).
-        // This way, we still have performance numbers for UDP
-        if(observer != null)
-            observer.passDown(evt);
-
-        if(dest_addr == null) { // 'null' means send to all group members
-            if(ip_mcast) {
-                if(mcast_addr == null) {
-                    if(log.isErrorEnabled()) log.error("dest address of message is null, and " +
-                                              "sending to default address fails as mcast_addr is null, too !" +
-                                              " Discarding message " + Util.printEvent(evt));
-                    return;
-                }
-                // if we want to use IP multicast, then set the destination of the message
-                msg.setDest(mcast_addr);
-            }
-            else {
-                //sends a separate UDP message to each address
-                sendMultipleUdpMessages(msg, members);
-                return;
-            }
-        }
-
-        try {
-            sendUdpMessage(msg);
-        }
-        catch(Exception e) {
-            if(log.isErrorEnabled()) log.error("exception=" + e + ", msg=" + msg + ", mcast_addr=" + mcast_addr);
-        }
-    }
-
-
-
-
-
-
-    /*--------------------------- End of Protocol interface -------------------------- */
-
-
-    /* ------------------------------ Private Methods -------------------------------- */
-
-
-
-    /**
-     * If the sender is null, set our own address. We cannot just go ahead and set the address
-     * anyway, as we might be sending a message on behalf of someone else ! E.gin case of
-     * retransmission, when the original sender has crashed, or in a FLUSH protocol when we
-     * have to return all unstable messages with the FLUSH_OK response.
-     */
-    private void setSourceAddress(Message msg) {
-        if(msg.getSrc() == null)
-            msg.setSrc(local_addr);
-    }
-
-
-
-
-    /**
-     * Processes a packet read from either the multicast or unicast socket. Needs to be synchronized because
-     * mcast or unicast socket reads can be concurrent.
-     * Correction (bela April 19 2005): we acces no instance variables, all vars are allocated on the stack, so
-     * this method should be reentrant: removed 'synchronized' keyword
-     */
-    void handleIncomingUdpPacket(IpAddress dest, InetAddress sender, int port, byte[] data) {
-        ByteArrayInputStream inp_stream=null;
-        DataInputStream      inp=null;
-        Message              msg=null;
-        List                 l;  // used if bundling is enabled
-        short                version;
-        boolean              is_message_list;
-
-        try {
-            // skip the first n bytes (default: 4), this is the version info
-            inp_stream=new ByteArrayInputStream(data);
-            inp=new DataInputStream(inp_stream);
-            version=inp.readShort();
-
-            if(Version.compareTo(version) == false) {
-                if(log.isWarnEnabled()) {
-                    StringBuffer sb=new StringBuffer();
-                    sb.append("packet from ").append(sender).append(':').append(port);
-                    sb.append(" has different version (").append(version);
-                    sb.append(") from ours (").append(Version.printVersion()).append("). ");
-                    if(discard_incompatible_packets)
-                        sb.append("Packet is discarded");
-                    else
-                        sb.append("This may cause problems");
-                    log.warn(sb.toString());
-                }
-                if(discard_incompatible_packets)
-                    return;
-            }
-
-            is_message_list=inp.readBoolean();
-            if(is_message_list) {
-                l=bufferToList(inp, dest);
-                for(Enumeration en=l.elements(); en.hasMoreElements();) {
-                    msg=(Message)en.nextElement();
-                    try {
-                        handleMessage(msg);
-                    }
-                    catch(Throwable t) {
-                        if(log.isErrorEnabled())
-                            log.error("failed unmarshalling message list", t);
-                    }
-                }
-            }
-            else {
-                msg=bufferToMessage(inp, dest, sender, port);
-                handleMessage(msg);
-            }
-        }
-        catch(Throwable e) {
-            if(log.isErrorEnabled()) log.error("exception in processing incoming packet", e);
-        }
-        finally {
-            Util.closeInputStream(inp);
-            Util.closeInputStream(inp_stream);
-        }
-    }
-
-
-
-    void handleMessage(Message msg) {
-        Event     evt;
-        UdpHeader hdr;
-        Address   dst=msg.getDest();
-
-        if(dst == null)
-            dst=mcast_addr;
-
-        if(stats) {
-            num_msgs_received++;
-            num_bytes_received+=msg.getLength();
-        }
-
-        // discard my own multicast loopback copy
-        if(loopback) {
-            Address src=msg.getSrc();
-            if((dst == null || (dst != null && dst.isMulticastAddress())) && src != null && local_addr.equals(src)) {
-                if(log.isTraceEnabled())
-                    log.trace("discarded own loopback multicast packet");
-                return;
-            }
-        }
-
-        evt=new Event(Event.MSG, msg);
-        if(log.isTraceEnabled()) {
-            StringBuffer sb=new StringBuffer("message is ");
-            sb.append(msg).append(", headers are ").append(msg.getHeaders());
-            log.trace(sb.toString());
-        }
-
-        /* Because Protocol.up() is never called by this bottommost layer, we call up() directly in the observer.
-        * This allows e.g. PerfObserver to get the time of reception of a message */
-        if(observer != null)
-            observer.up(evt, up_queue.size());
-
-        hdr=(UdpHeader)msg.getHeader(name); // replaced removeHeader() with getHeader()
-        if(hdr != null) {
-
-            /* Discard all messages destined for a channel with a different name */
-            String ch_name=hdr.channel_name;
-
-            // Discard if message's group name is not the same as our group name unless the
-            // message is a diagnosis message (special group name DIAG_GROUP)
-            if(ch_name != null && channel_name != null && !channel_name.equals(ch_name) &&
-                    !ch_name.equals(Util.DIAG_GROUP)) {
-                if(log.isWarnEnabled()) log.warn("discarded message from different group (" +
-                                                 ch_name + "). Sender was " + msg.getSrc());
-                return;
-            }
-        }
-        else {
-            if(log.isErrorEnabled()) log.error("message does not have a UDP header");
-        }
-        passUp(evt);
-    }
-
-
-    void sendUdpMessage(Message msg) throws Exception {
-        sendUdpMessage(msg, false);
-    }
-
-    /** Send a message to the address specified in dest */
-    void sendUdpMessage(Message msg, boolean copyForOutgoingQueue) throws Exception {
-        IpAddress           dest;
-        Message             copy;
-        Event               evt;
-
-        dest=(IpAddress)msg.getDest();  // guaranteed to be non-null
-        setSourceAddress(msg);
-
-        if(log.isTraceEnabled()) {
-            StringBuffer sb=new StringBuffer("sending msg to ");
-            sb.append(msg.getDest()).append(" (src=").append(msg.getSrc()).append("), headers are ").append(msg.getHeaders());
-            log.trace(sb.toString());
-        }
-
-        // Don't send if destination is local address. Instead, switch dst and src and put in up_queue.
-        // If multicast message, loopback a copy directly to us (but still multicast). Once we receive this,
-        // we will discard our own multicast message
-        if(loopback && (dest.equals(local_addr) || dest.isMulticastAddress())) {
-            copy=msg.copy();
-            // copy.removeHeader(name); // we don't remove the header
-            copy.setSrc(local_addr);
-            // copy.setDest(dest);
-            evt=new Event(Event.MSG, copy);
-
-            /* Because Protocol.up() is never called by this bottommost layer, we call up() directly in the observer.
-               This allows e.g. PerfObserver to get the time of reception of a message */
-            if(observer != null)
-                observer.up(evt, up_queue.size());
-            if(log.isTraceEnabled()) log.trace("looped back local message " + copy);
-            passUp(evt);
-            if(dest != null && !dest.isMulticastAddress())
-                return;
-        }
-
-        if(use_outgoing_packet_handler) {
-            if(copyForOutgoingQueue)
-                outgoing_queue.add(msg.copy());
-            else
-                outgoing_queue.add(msg);
-            return;
-        }
-
-        send(msg);
-    }
-
-
-    /** Internal method to serialize and send a message. This method is not reentrant */
-    void send(Message msg) throws Exception {
-        Buffer     buf;
-        IpAddress  dest=(IpAddress)msg.getDest(); // guaranteed to be non-null
-        IpAddress  src=(IpAddress)msg.getSrc();
-
-        synchronized(out_stream) {
-            buf=messageToBuffer(msg, dest, src);
-            doSend(buf, dest.getIpAddress(), dest.getPort());
-        }
-    }
-
-
-
-    void doSend(Buffer buf, InetAddress dest, int port) throws IOException {
-        DatagramPacket       packet;
-
-        // packet=new DatagramPacket(data, data.length, dest, port);
-        packet=new DatagramPacket(buf.getBuf(), buf.getOffset(), buf.getLength(), dest, port);
-        if(stats) {
-            num_msgs_sent++;
-            num_bytes_sent+=buf.getLength();
-        }
-        if(dest.isMulticastAddress() && mcast_send_sock != null) { // mcast_recv_sock might be null if ip_mcast is false
-            mcast_send_sock.send(packet);
-        }
-        else {
-            if(sock != null)
-                sock.send(packet);
-        }
-    }
-
-
-
-    void sendMultipleUdpMessages(Message msg, Vector dests) {
-        Address dest;
-
-        for(int i=0; i < dests.size(); i++) {
-            dest=(Address)dests.elementAt(i);
-            msg.setDest(dest);
-
-            try {
-                sendUdpMessage(msg,
-                               true); // copy for outgoing queue if outgoing queue handler is enabled
-            }
-            catch(Exception e) {
-                if(log.isErrorEnabled()) log.error("failed sending multiple messages", e);
-            }
-        }
-    }
-
-
-    /**
-     * This method needs to be synchronized on out_stream when it is called
-     * @param msg
-     * @param dest
-     * @param src
-     * @return
-     * @throws IOException
-     */
-    private Buffer messageToBuffer(Message msg, IpAddress dest, IpAddress src) throws Exception {
-        Buffer retval;
-        DataOutputStream out=null;
-
-        try {
-            out_stream.reset();
-            out=new DataOutputStream(out_stream);
-            out.writeShort(Version.version); // write the version
-            out.writeBoolean(false); // single message, *not* a list of messages
-            nullAddresses(msg, dest, src);
-            msg.writeTo(out);
-            revertAddresses(msg, dest, src);
-            out.flush();
-            retval=new Buffer(out_stream.getRawBuffer(), 0, out_stream.size());
-            return retval;
-        }
-        finally {
-            Util.closeOutputStream(out);
-        }
-    }
-
-
-    private void nullAddresses(Message msg, IpAddress dest, IpAddress src) {
-        msg.setDest(null);
-        if(!dest.isMulticastAddress()) { // unicast
-            if(src != null) {
-                if(null_src_addresses)
-                    msg.setSrc(new IpAddress(src.getPort(), false)); // null the host part, leave the port
-                if(src.getAdditionalData() != null)
-                    ((IpAddress)msg.getSrc()).setAdditionalData(src.getAdditionalData());
-            }
-            else {
-                msg.setSrc(null);
-            }
-        }
-        else {  // multicast
-            if(src != null) {
-                if(null_src_addresses)
-                    msg.setSrc(new IpAddress(src.getPort(), false));  // null the host part, leave the port
-                if(src.getAdditionalData() != null)
-                    ((IpAddress)msg.getSrc()).setAdditionalData(src.getAdditionalData());
-            }
-        }
-    }
-
-    private void revertAddresses(Message msg, IpAddress dest, IpAddress src) {
-        msg.setDest(dest);
-        msg.setSrc(src);
-    }
-
-
-    private Message bufferToMessage(DataInputStream instream, IpAddress dest, InetAddress sender, int port)
-            throws Exception {
-        Message msg=new Message();
-        msg.readFrom(instream);
-        setAddresses(msg, dest, sender, port);
-        return msg;
-    }
-
-
-    private void setAddresses(Message msg, IpAddress dest, InetAddress sender, int port) {
-        // set the destination address
-        if(msg.getDest() == null && dest != null)
-            msg.setDest(dest);
-
-        // set the source address if not set
-        IpAddress src_addr=(IpAddress)msg.getSrc();
-        if(src_addr == null) {
-            try {msg.setSrc(new IpAddress(sender, port));} catch(Throwable t) {}
-        }
-        else {
-            byte[] tmp_additional_data=src_addr.getAdditionalData();
-            if(src_addr.getIpAddress() == null) {
-                try {msg.setSrc(new IpAddress(sender, src_addr.getPort()));} catch(Throwable t) {}
-            }
-            if(tmp_additional_data != null)
-                ((IpAddress)msg.getSrc()).setAdditionalData(tmp_additional_data);
-        }
-    }
-
-
-
-    private Buffer listToBuffer(List l, IpAddress dest) throws Exception {
-        Buffer retval=null;
-        IpAddress src;
-        Message msg;
-        int len=l != null? l.size() : 0;
-        boolean src_written=false;
-        DataOutputStream out=null;
-        out_stream.reset();
-
-        try {
-            out=new DataOutputStream(out_stream);
-            out.writeShort(Version.version);
-            out.writeBoolean(true);
-            out.writeInt(len);
-
-            for(Enumeration en=l.elements(); en.hasMoreElements();) {
-                msg=(Message)en.nextElement();
-                src=(IpAddress)msg.getSrc();
-                if(!src_written) {
-                    Util.writeAddress(src, out);
-                    src_written=true;
-                }
-                msg.setDest(null);
-                msg.setSrc(null);
-                msg.writeTo(out);
-                revertAddresses(msg, dest, src);
-            }
-            out.flush();
-            retval=new Buffer(out_stream.getRawBuffer(), 0, out_stream.size());
-            return retval;
-        }
-        finally {
-            Util.closeOutputStream(out);
-        }
-    }
-
-    private List bufferToList(DataInputStream instream, IpAddress dest) throws Exception {
-        List            l=new List();
-        DataInputStream in=null;
-        int             len;
-        Message         msg;
-        Address         src;
-
-        try {
-            len=instream.readInt();
-            src=Util.readAddress(instream);
-            for(int i=0; i < len; i++) {
-                msg=new Message();
-                msg.readFrom(instream);
-                msg.setDest(dest);
-                msg.setSrc(src);
-                l.add(msg);
-            }
-            return l;
-        }
-        finally {
-            Util.closeInputStream(in);
-        }
-    }
-
-
-
-    /**
-     * Create UDP sender and receiver sockets. Currently there are 2 sockets
-     * (sending and receiving). This is due to Linux's non-BSD compatibility
-     * in the JDK port (see DESIGN).
-     */
-    void createSockets() throws Exception {
-        InetAddress tmp_addr=null;
-
-        // bind_addr not set, try to assign one by default. This is needed on Windows
-
-        // changed by bela Feb 12 2003: by default multicast sockets will be bound to all network interfaces
-
-        // CHANGED *BACK* by bela March 13 2003: binding to all interfaces did not result in a correct
-        // local_addr. As a matter of fact, comparison between e.g. 0.0.0.0:1234 (on hostA) and
-        // 0.0.0.0:1.2.3.4 (on hostB) would fail !
-        if(bind_addr == null) {
-            InetAddress[] interfaces=InetAddress.getAllByName(InetAddress.getLocalHost().getHostAddress());
-            if(interfaces != null && interfaces.length > 0)
-                bind_addr=interfaces[0];
-        }
-        if(bind_addr == null)
-            bind_addr=InetAddress.getLocalHost();
-
-        if(bind_addr != null)
-            if(log.isInfoEnabled()) log.info("sockets will use interface " + bind_addr.getHostAddress());
-
-
-        // 2. Create socket for receiving unicast UDP packets. The address and port
-        //    of this socket will be our local address (local_addr)
-        if(bind_port > 0) {
-            sock=createDatagramSocketWithBindPort();
-        }
-        else {
-            sock=createEphemeralDatagramSocket();
-        }
-        if(tos > 0) {
-            try {
-                sock.setTrafficClass(tos);
-            }
-            catch(SocketException e) {
-                log.warn("traffic class of " + tos + " could not be set, will be ignored", e);
-            }
-        }
-
-        if(sock == null)
-            throw new Exception("UDP.createSocket(): sock is null");
-
-        local_addr=new IpAddress(sock.getLocalAddress(), sock.getLocalPort());
-        if(additional_data != null)
-            local_addr.setAdditionalData(additional_data);
-
-
-        // 3. Create socket for receiving IP multicast packets
-        if(ip_mcast) {
-            // 3a. Create mcast receiver socket
-            mcast_recv_sock=new MulticastSocket(mcast_port);
-            mcast_recv_sock.setTimeToLive(ip_ttl);
-            tmp_addr=InetAddress.getByName(mcast_addr_name);
-            mcast_addr=new IpAddress(tmp_addr, mcast_port);
-
-            if(bind_to_all_interfaces) {
-                bindToAllInterfaces(mcast_recv_sock, mcast_addr.getIpAddress());
-            }
-            else {
-                if(bind_addr != null)
-                    mcast_recv_sock.setInterface(bind_addr);
-                 mcast_recv_sock.joinGroup(tmp_addr);
-            }
-
-            // 3b. Create mcast sender socket
-            mcast_send_sock=new MulticastSocket();
-            mcast_send_sock.setTimeToLive(ip_ttl);
-            if(bind_addr != null)
-                mcast_send_sock.setInterface(bind_addr);
-
-            if(tos > 0) {
-                try {
-                    mcast_send_sock.setTrafficClass(tos); // high throughput
-                }
-                catch(SocketException e) {
-                    log.warn("traffic class of " + tos + " could not be set, will be ignored", e);
-                }
-            }
-        }
-
-        setBufferSizes();
-        if(log.isInfoEnabled()) log.info("socket information:\n" + dumpSocketInfo());
-    }
-
-
-    private void bindToAllInterfaces(MulticastSocket s, InetAddress mcastAddr) throws IOException {
-        SocketAddress tmp_mcast_addr=new InetSocketAddress(mcastAddr, mcast_port);
-        Enumeration en=NetworkInterface.getNetworkInterfaces();
-        while(en.hasMoreElements()) {
-            NetworkInterface i=(NetworkInterface)en.nextElement();
-            for(Enumeration en2=i.getInetAddresses(); en2.hasMoreElements();) {
-                InetAddress addr=(InetAddress)en2.nextElement();
-                // if(addr.isLoopbackAddress())
-                // continue;
-                s.joinGroup(tmp_mcast_addr, i);
-                if(log.isTraceEnabled())
-                    log.trace("joined " + tmp_mcast_addr + " on interface " + i.getName() + " (" + addr + ")");
-                break;
-            }
-        }
-    }
-
-
-    /** Creates a DatagramSocket with a random port. Because in certain operating systems, ports are reused,
-     * we keep a list of the n last used ports, and avoid port reuse */
-    private DatagramSocket createEphemeralDatagramSocket() throws SocketException {
-        DatagramSocket tmp=null;
-        int localPort=0;
-        while(true) {
-            tmp=new DatagramSocket(localPort, bind_addr); // first time localPort is 0
-            if(num_last_ports <= 0)
-                break;
-            localPort=tmp.getLocalPort();
-            if(getLastPortsUsed().contains(new Integer(localPort))) {
-                if(log.isDebugEnabled())
-                    log.debug("local port " + localPort + " already seen in this session; will try to get other port");
-                try {tmp.close();} catch(Throwable e) {}
-                localPort++;
-                continue;
-            }
-            else {
-                getLastPortsUsed().add(new Integer(localPort));
-                break;
-            }
-        }
-        return tmp;
-    }
-
-
-
-
-    /**
-     * Creates a DatagramSocket when bind_port > 0. Attempts to allocate the socket with port == bind_port, and
-     * increments until it finds a valid port, or until port_range has been exceeded
-     * @return DatagramSocket The newly created socket
-     * @throws Exception
-     */
-    private DatagramSocket createDatagramSocketWithBindPort() throws Exception {
-        DatagramSocket tmp=null;
-        // 27-6-2003 bgooren, find available port in range (start_port, start_port+port_range)
-        int rcv_port=bind_port, max_port=bind_port + port_range;
-        while(rcv_port <= max_port) {
-            try {
-                tmp=new DatagramSocket(rcv_port, bind_addr);
-                break;
-            }
-            catch(SocketException bind_ex) {	// Cannot listen on this port
-                rcv_port++;
-            }
-            catch(SecurityException sec_ex) { // Not allowed to listen on this port
-                rcv_port++;
-            }
-
-            // Cannot listen at all, throw an Exception
-            if(rcv_port >= max_port + 1) { // +1 due to the increment above
-                throw new Exception("UDP.createSockets(): cannot list on any port in range " +
-                        bind_port + '-' + (bind_port + port_range));
-            }
-        }
-        return tmp;
-    }
-
-
-    private String dumpSocketInfo() throws Exception {
-        StringBuffer sb=new StringBuffer(128);
-        sb.append("local_addr=").append(local_addr);
-        sb.append(", mcast_addr=").append(mcast_addr);
-        sb.append(", bind_addr=").append(bind_addr);
-        sb.append(", ttl=").append(ip_ttl);
-
-        if(sock != null) {
-            sb.append("\nsock: bound to ");
-            sb.append(sock.getLocalAddress().getHostAddress()).append(':').append(sock.getLocalPort());
-            sb.append(", receive buffer size=").append(sock.getReceiveBufferSize());
-            sb.append(", send buffer size=").append(sock.getSendBufferSize());
-        }
-
-        if(mcast_recv_sock != null) {
-            sb.append("\nmcast_recv_sock: bound to ");
-            sb.append(mcast_recv_sock.getInterface().getHostAddress()).append(':').append(mcast_recv_sock.getLocalPort());
-            sb.append(", send buffer size=").append(mcast_recv_sock.getSendBufferSize());
-            sb.append(", receive buffer size=").append(mcast_recv_sock.getReceiveBufferSize());
-        }
-
-         if(mcast_send_sock != null) {
-            sb.append("\nmcast_send_sock: bound to ");
-            sb.append(mcast_send_sock.getInterface().getHostAddress()).append(':').append(mcast_send_sock.getLocalPort());
-            sb.append(", send buffer size=").append(mcast_send_sock.getSendBufferSize());
-            sb.append(", receive buffer size=").append(mcast_send_sock.getReceiveBufferSize());
-        }
-        return sb.toString();
-    }
-
-
-    void setBufferSizes() {
-        if(sock != null) {
-            try {
-                sock.setSendBufferSize(ucast_send_buf_size);
-            }
-            catch(Throwable ex) {
-                if(log.isWarnEnabled()) log.warn("failed setting ucast_send_buf_size in sock: " + ex);
-            }
-            try {
-                sock.setReceiveBufferSize(ucast_recv_buf_size);
-            }
-            catch(Throwable ex) {
-                if(log.isWarnEnabled()) log.warn("failed setting ucast_recv_buf_size in sock: " + ex);
-            }
-        }
-
-        if(mcast_recv_sock != null) {
-            try {
-                mcast_recv_sock.setSendBufferSize(mcast_send_buf_size);
-            }
-            catch(Throwable ex) {
-                if(log.isWarnEnabled()) log.warn("failed setting mcast_send_buf_size in mcast_recv_sock: " + ex);
-            }
-
-            try {
-                mcast_recv_sock.setReceiveBufferSize(mcast_recv_buf_size);
-            }
-            catch(Throwable ex) {
-                if(log.isWarnEnabled()) log.warn("failed setting mcast_recv_buf_size in mcast_recv_sock: " + ex);
-            }
-        }
-
-        if(mcast_send_sock != null) {
-            try {
-                mcast_send_sock.setSendBufferSize(mcast_send_buf_size);
-            }
-            catch(Throwable ex) {
-                if(log.isWarnEnabled()) log.warn("failed setting mcast_send_buf_size in mcast_send_sock: " + ex);
-            }
-
-            try {
-                mcast_send_sock.setReceiveBufferSize(mcast_recv_buf_size);
-            }
-            catch(Throwable ex) {
-                if(log.isWarnEnabled()) log.warn("failed setting mcast_recv_buf_size in mcast_send_sock: " + ex);
-            }
-        }
-
-    }
-
-
-    /**
-     * Closed UDP unicast and multicast sockets
-     */
-    void closeSockets() {
-        // 1. Close multicast socket
-        closeMulticastSocket();
-
-        // 2. Close socket
-        closeSocket();
-    }
-
-
-    void closeMulticastSocket() {
-        if(mcast_recv_sock != null) {
-            try {
-                if(mcast_addr != null) {
-                    mcast_recv_sock.leaveGroup(mcast_addr.getIpAddress());
-                }
-                mcast_recv_sock.close(); // this will cause the mcast receiver thread to break out of its loop
-                mcast_recv_sock=null;
-                if(log.isDebugEnabled()) log.debug("multicast receive socket closed");
-            }
-            catch(IOException ex) {
-            }
-            mcast_addr=null;
-        }
-
-        if(mcast_send_sock != null) {
-            mcast_send_sock.close(); // this will cause the mcast receiver thread to break out of its loop
-            mcast_send_sock=null;
-            if(log.isDebugEnabled()) log.debug("multicast send socket closed");
-        }
-    }
-
-
-    void closeSocket() {
-        if(sock != null) {
-            sock.close();
-            sock=null;
-            if(log.isDebugEnabled()) log.debug("socket closed");
-        }
-    }
-
-
-
-
-    /**
-     * Starts the unicast and multicast receiver threads
-     */
-    void startThreads() throws Exception {
-        if(ucast_receiver == null) {
-            //start the listener thread of the ucast_recv_sock
-            ucast_receiver=new UcastReceiver();
-            ucast_receiver.start();
-             if(log.isDebugEnabled()) log.debug("created unicast receiver thread");
-        }
-
-        if(ip_mcast) {
-            if(mcast_receiver != null) {
-                if(mcast_receiver.isAlive()) {
-                    if(log.isDebugEnabled()) log.debug("did not create new multicastreceiver thread as existing " +
-                                                       "multicast receiver thread is still running");
-                }
-                else
-                    mcast_receiver=null; // will be created just below...
-            }
-
-            if(mcast_receiver == null) {
-                mcast_receiver=new Thread(this, "UDP mcast receiver");
-                mcast_receiver.setPriority(Thread.MAX_PRIORITY); // needed ????
-                mcast_receiver.setDaemon(true);
-                mcast_receiver.start();
-            }
-        }
-        if(use_outgoing_packet_handler)
-            outgoing_packet_handler.start();
-        if(use_incoming_packet_handler)
-            incoming_packet_handler.start();
-    }
-
-
-    /**
-     * Stops unicast and multicast receiver threads
-     */
-    void stopThreads() {
-        Thread tmp;
-
-        // 1. Stop the multicast receiver thread
-        if(mcast_receiver != null) {
-            if(mcast_receiver.isAlive()) {
-                tmp=mcast_receiver;
-                mcast_receiver=null;
-                closeMulticastSocket();  // will cause the multicast thread to terminate
-                tmp.interrupt();
-                try {
-                    tmp.join(100);
-                }
-                catch(Exception e) {
-                }
-                tmp=null;
-            }
-            mcast_receiver=null;
-        }
-
-        // 2. Stop the unicast receiver thread
-        if(ucast_receiver != null) {
-            ucast_receiver.stop();
-            ucast_receiver=null;
-        }
-
-        // 3. Stop the in_packet_handler thread
-        if(incoming_packet_handler != null)
-            incoming_packet_handler.stop();
-
-        // 4. Stop the outgoing packet handler thread
-        if(outgoing_packet_handler != null)
-            outgoing_packet_handler.stop();
-    }
-
-
-    void handleDownEvent(Event evt) {
-        switch(evt.getType()) {
-
-        case Event.TMP_VIEW:
-        case Event.VIEW_CHANGE:
-            synchronized(members) {
-                members.removeAllElements();
-                Vector tmpvec=((View)evt.getArg()).getMembers();
-                for(int i=0; i < tmpvec.size(); i++)
-                    members.addElement(tmpvec.elementAt(i));
-            }
-            break;
-
-        case Event.GET_LOCAL_ADDRESS:   // return local address -> Event(SET_LOCAL_ADDRESS, local)
-            passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr));
-            break;
-
-        case Event.CONNECT:
-            channel_name=(String)evt.getArg();
-            udp_hdr=new UdpHeader(channel_name);
-
-            // removed March 18 2003 (bela), not needed (handled by GMS)
-            // changed July 2 2003 (bela): we discard CONNECT_OK at the GMS level anyway, this might
-            // be needed if we run without GMS though
-            passUp(new Event(Event.CONNECT_OK));
-            break;
-
-        case Event.DISCONNECT:
-            passUp(new Event(Event.DISCONNECT_OK));
-            break;
-
-        case Event.CONFIG:
-            if(log.isDebugEnabled()) log.debug("received CONFIG event: " + evt.getArg());
-            handleConfigEvent((HashMap)evt.getArg());
-            break;
-        }
-    }
-
-
-    void handleConfigEvent(HashMap map) {
-        if(map == null) return;
-        if(map.containsKey("additional_data"))
-            additional_data=(byte[])map.get("additional_data");
-        if(map.containsKey("send_buf_size")) {
-            mcast_send_buf_size=((Integer)map.get("send_buf_size")).intValue();
-            ucast_send_buf_size=mcast_send_buf_size;
-        }
-        if(map.containsKey("recv_buf_size")) {
-            mcast_recv_buf_size=((Integer)map.get("recv_buf_size")).intValue();
-            ucast_recv_buf_size=mcast_recv_buf_size;
-        }
-        setBufferSizes();
-    }
-
-
-
-    /* ----------------------------- End of Private Methods ---------------------------------------- */
-
-    /* ----------------------------- Inner Classes ---------------------------------------- */
-
-    class IncomingQueueEntry {
-        IpAddress   dest=null;
-        InetAddress sender=null;
-        int         port=-1;
-        byte[]      buf;
-
-        public IncomingQueueEntry(IpAddress dest, InetAddress sender, int port, byte[] buf) {
-            this.dest=dest;
-            this.sender=sender;
-            this.port=port;
-            this.buf=buf;
-        }
-
-        public IncomingQueueEntry(byte[] buf) {
-            this.buf=buf;
-        }
-    }
-
-
-
-    public class UcastReceiver implements Runnable {
-        boolean running=true;
-        Thread thread=null;
-
-
-        public void start() {
-            if(thread == null) {
-                thread=new Thread(this, "UDP.UcastReceiverThread");
-                thread.setDaemon(true);
-                running=true;
-                thread.start();
-            }
-        }
-
-
-        public void stop() {
-            Thread tmp;
-            if(thread != null && thread.isAlive()) {
-                running=false;
-                tmp=thread;
-                thread=null;
-                closeSocket(); // this will cause the thread to break out of its loop
-                tmp.interrupt();
-                tmp=null;
-            }
-            thread=null;
-        }
-
-
-        public void run() {
-            DatagramPacket  packet;
-            byte            receive_buf[]=new byte[65535];
-            int             len;
-            byte[]          data, tmp;
-            InetAddress     sender_addr;
-            int             sender_port;
-
-            // moved out of loop to avoid excessive object creations (bela March 8 2001)
-            packet=new DatagramPacket(receive_buf, receive_buf.length);
-
-            while(running && thread != null && sock != null) {
-                try {
-                    packet.setData(receive_buf, 0, receive_buf.length);
-                    sock.receive(packet);
-                    sender_addr=packet.getAddress();
-                    sender_port=packet.getPort();
-                    len=packet.getLength();
-                    data=packet.getData();
-                    if(log.isTraceEnabled())
-                        log.trace(new StringBuffer("received (ucast) ").append(len).append(" bytes from ").
-                                  append(sender_addr).append(':').append(sender_port));
-                    if(len > receive_buf.length) {
-                        if(log.isErrorEnabled())
-                            log.error("size of the received packet (" + len + ") is bigger than allocated buffer (" +
-                                      receive_buf.length + "): will not be able to handle packet. " +
-                                      "Use the FRAG protocol and make its frag_size lower than " + receive_buf.length);
-                    }
-
-                    if(use_incoming_packet_handler) {
-                        tmp=new byte[len];
-                        System.arraycopy(data, 0, tmp, 0, len);
-                        incoming_queue.add(new IncomingQueueEntry(local_addr, sender_addr, sender_port, tmp));
-                    }
-                    else
-                        handleIncomingUdpPacket(local_addr, sender_addr, sender_port, data);
-                }
-                catch(SocketException sock_ex) {
-                    if(log.isDebugEnabled()) log.debug("unicast receiver socket is closed, exception=" + sock_ex);
-                    break;
-                }
-                catch(InterruptedIOException io_ex) { // thread was interrupted
-                    ; // go back to top of loop, where we will terminate loop
-                }
-                catch(Throwable ex) {
-                    if(log.isErrorEnabled())
-                        log.error("[" + local_addr + "] failed receiving unicast packet", ex);
-                    Util.sleep(100); // so we don't get into 100% cpu spinning (should NEVER happen !)
-                }
-            }
-            if(log.isDebugEnabled()) log.debug("unicast receiver thread terminated");
-        }
-    }
-
-
-    /**
-     * This thread fetches byte buffers from the packet_queue, converts them into messages and passes them up
-     * to the higher layer (done in handleIncomingUdpPacket()).
-     */
-    class IncomingPacketHandler implements Runnable {
-        Thread t=null;
-
-        public void run() {
-            byte[] data;
-            IncomingQueueEntry entry;
-
-            while(incoming_queue != null && incoming_packet_handler != null) {
-                try {
-                    entry=(IncomingQueueEntry)incoming_queue.remove();
-                    data=entry.buf;
-                }
-                catch(QueueClosedException closed_ex) {
-                    if(log.isDebugEnabled()) log.debug("packet_handler thread terminating");
-                    break;
-                }
-                handleIncomingUdpPacket(entry.dest, entry.sender, entry.port, data);
-            }
-        }
-
-        void start() {
-            if(t == null || !t.isAlive()) {
-                t=new Thread(this, "UDP.IncomingPacketHandler thread");
-                t.setDaemon(true);
-                t.start();
-            }
-        }
-
-        void stop() {
-            if(incoming_queue != null)
-                incoming_queue.close(false); // should terminate the packet_handler thread too
-            t=null;
-            incoming_queue=null;
-        }
-    }
-
-
-    /**
-     * This thread fetches byte buffers from the outgoing_packet_queue, converts them into messages and sends them
-     * using the unicast or multicast socket
-     */
-    class OutgoingPacketHandler implements Runnable {
-        Thread             t=null;
-        byte[]             buf;
-        DatagramPacket     packet;
-        IpAddress          dest;
-
-        public void run() {
-            Message msg;
-
-            while(outgoing_queue != null && outgoing_packet_handler != null) {
-                try {
-                    msg=(Message)outgoing_queue.remove();
-                    handleMessage(msg);
-                }
-                catch(QueueClosedException closed_ex) {
-                    break;
-                }
-                catch(Throwable th) {
-                    if(log.isErrorEnabled()) log.error("exception sending packet", th);
-                }
-                msg=null; // let's give the poor garbage collector a hand...
-            }
-            if(log.isTraceEnabled()) log.trace("packet_handler thread terminating");
-        }
-
-        protected void handleMessage(Message msg) throws Exception {
-            send(msg);
-        }
-
-
-        void start() {
-            if(t == null || !t.isAlive()) {
-                t=new Thread(this, "UDP.OutgoingPacketHandler thread");
-                t.setDaemon(true);
-                t.start();
-            }
-        }
-
-        void stop() {
-            if(outgoing_queue != null)
-                outgoing_queue.close(false); // should terminate the packet_handler thread too
-            t=null;
-            // outgoing_queue=null;
-        }
-    }
-
-
-
-
-    /**
-     * Bundles smaller messages into bigger ones. Collects messages in a list until
-     * messages of a total of <tt>max_bundle_size bytes</tt> have accumulated, or until
-     * <tt>max_bundle_timeout</tt> milliseconds have elapsed, whichever is first. Messages
-     * are unbundled at the receiver.
-     */
-    class BundlingOutgoingPacketHandler extends OutgoingPacketHandler {
-        long                total_bytes=0;
-        /** HashMap<Address, List<Message>>. Keys are destinations, values are lists of Messages */
-        final HashMap       msgs=new HashMap(11);
-
-
-        void start() {
-            super.start();
-            t.setName("UDP.BundlingOutgoingPacketHandler thread");
-        }
-
-
-        public void run() {
-            Message msg=null, leftover=null;
-            long start=0;
-            while(outgoing_queue != null) {
-                try {
-                    total_bytes=0;
-                    msg=leftover != null? leftover : (Message)outgoing_queue.remove(); // blocks until message is available
-                    start=System.currentTimeMillis();
-                    leftover=waitForMessagesToAccumulate(msg, outgoing_queue, max_bundle_size, start, max_bundle_timeout);
-                    bundleAndSend(start);
-                }
-                catch(QueueClosedException closed_ex) {
-                    break;
-                }
-                catch(Throwable th) {
-                    if(log.isErrorEnabled()) log.error("exception sending packet", th);
-                }
-            }
-            bundleAndSend(start);
-            if(log.isTraceEnabled()) log.trace("packet_handler thread terminating");
-        }
-
-
-        /**
-         * Waits until max_size bytes have accumulated in the queue, or max_time milliseconds have elapsed.
-         * When a message cannot be added to the ready-to-send bundle, it is returned, so the caller can
-         * re-submit it again next time.
-         * @param m
-         * @param q
-         * @param max_size
-         * @param max_time
-         * @return
-         */
-        Message waitForMessagesToAccumulate(Message m, Queue q, long max_size, long start_time, long max_time) {
-            Message msg, leftover=null;
-            boolean running=true, size_exceeded=false, time_reached=false;
-            long    len, time_to_wait=max_time, waited_time=0;
-
-            while(running) {
-                try {
-                    msg=m != null? m : (Message)q.remove(time_to_wait);
-                    m=null; // necessary, otherwise we get 'm' again in subsequent iterations of the same loop !
-                    len=msg.size();
-                    checkLength(len);
-                    waited_time=System.currentTimeMillis() - start_time;
-                    time_to_wait=max_time - waited_time;
-                    size_exceeded=total_bytes + len > max_size;
-                    time_reached=time_to_wait <= 0;
-
-                    if(size_exceeded) {
-                        running=false;
-                        leftover=msg;
-                    }
-                    else {
-                        addMessage(msg);
-                        total_bytes+=len;
-                        if(time_reached)
-                            running=false;
-                    }
-                }
-                catch(TimeoutException timeout) {
-                    waited_time=System.currentTimeMillis() - start_time;
-                    time_reached=true;
-                    break;
-                }
-                catch(QueueClosedException closed) {
-                    break;
-                }
-                catch(Exception ex) {
-                    log.error("failure in bundling", ex);
-                }
-            }
-            return leftover;
-        }
-
-
-        void checkLength(long len) throws Exception {
-            if(len > max_bundle_size)
-                throw new Exception("UDP.BundlingOutgoingPacketHandler.handleMessage(): message size (" + len +
-                                    ") is greater than max bundling size (" + max_bundle_size + "). " +
-                                    "Set the fragmentation/bundle size in FRAG and UDP correctly");
-        }
-
-
-        void addMessage(Message msg) {
-            List    tmp;
-            Address dst=msg.getDest();
-            synchronized(msgs) {
-                tmp=(List)msgs.get(dst);
-                if(tmp == null) {
-                    tmp=new List();
-                    msgs.put(dst, tmp);
-                }
-                tmp.add(msg);
-            }
-        }
-
-
-
-        private void bundleAndSend(long start_time) {
-            Map.Entry      entry;
-            IpAddress      dst;
-            Buffer         buffer;
-            InetAddress    addr;
-            int            port;
-            List           l;
-            long           stop_time=System.currentTimeMillis();
-
-            synchronized(msgs) {
-                if(msgs.size() == 0)
-                    return;
-                if(start_time == 0)
-                    start_time=System.currentTimeMillis();
-
-                if(log.isTraceEnabled()) {
-                    StringBuffer sb=new StringBuffer("sending ").append(numMsgs(msgs)).append(" msgs (");
-                    sb.append(total_bytes).append(" bytes, ").append(stop_time-start_time).append("ms)");
-                    sb.append(" to ").append(msgs.size()).append(" destination(s)");
-                    if(msgs.size() > 1) sb.append(" (dests=").append(msgs.keySet()).append(")");
-                    log.trace(sb.toString());
-                }
-                for(Iterator it=msgs.entrySet().iterator(); it.hasNext();) {
-                    entry=(Map.Entry)it.next();
-                    dst=(IpAddress)entry.getKey();
-                    addr=dst.getIpAddress();
-                    port=dst.getPort();
-                    l=(List)entry.getValue();
-                    try {
-                        if(l.size() > 0) {
-                            synchronized(out_stream) {
-                                buffer=listToBuffer(l, dst);
-                                doSend(buffer, addr, port);
-                            }
-                        }
-                    }
-                    catch(Exception e) {
-                        if(log.isErrorEnabled()) log.error("exception sending msg (to dest=" + dst + ")", e);
-                    }
-                }
-                msgs.clear();
-            }
-        }
-
-        private int numMsgs(HashMap map) {
-            Collection values=map.values();
-            List l;
-            int size=0;
-            for(Iterator it=values.iterator(); it.hasNext();) {
-                l=(List)it.next();
-                size+=l.size();
-            }
-            return size;
-        }
-    }
-
-
-    String dumpMessages(HashMap map) {
-        StringBuffer sb=new StringBuffer();
-        Map.Entry    entry;
-        List         l;
-        Object       key;
-        if(map != null) {
-            synchronized(map) {
-                for(Iterator it=map.entrySet().iterator(); it.hasNext();) {
-                    entry=(Map.Entry)it.next();
-                    key=entry.getKey();
-                    if(key == null)
-                        key="null";
-                    l=(List)entry.getValue();
-                    sb.append(key).append(": ");
-                    sb.append(l.size()).append(" msgs\n");
-                }
-            }
-        }
-        return sb.toString();
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/obsolete/UNIFORM.java.txt
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/obsolete/UNIFORM.java.txt b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/obsolete/UNIFORM.java.txt
deleted file mode 100644
index e26f559..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/obsolete/UNIFORM.java.txt
+++ /dev/null
@@ -1,349 +0,0 @@
-// $Id: UNIFORM.java.txt,v 1.3 2005/05/30 14:31:05 belaban Exp $
-
-package org.jgroups.protocols;
-
-import java.io.Serializable;
-import java.util.Hashtable;
-import java.util.Properties;
-import java.util.Vector;
-
-
-
-class UniformHeader implements Serializable {
-    public static final int SAVE        = 0;
-    public static final int SAVE_OK     = 1;
-    public static final int DELIVER     = 2;
-    public static final int DELIVER_OK  = 3;
-    public static final int SEEN        = 4;
-    public static final int SEEN_OK     = 5;
-    public static final int SEEN_NOTOK  = 6;
-    public static final int GC          = 7;
-    public static final int GC_OK       = 8;
-
-    public int      type=-1;
-    public long     id=-1;
-    public boolean  handle=true;
-
-
-    String type2Str(int t) {
-	switch(t) {
-	case SAVE:        return "SAVE";
-	case SAVE_OK:     return "SAVE_OK";
-	case DELIVER:     return "DELIVER";
-	case DELIVER_OK:  return "DELIVER_OK";
-	case SEEN:        return "SEEN";
-	case SEEN_OK:     return "SEEN_OK";
-	case GC:          return "GC";
-	case GC_OK:       return "GC_OK";
-	default:          return "<unknown>";
-	}
-    }
-
-    
-    public UniformHeader() {handle=false;}
-    
-
-    public UniformHeader(int type) {
-	this.type=type;
-	id=System.currentTimeMillis();
-	handle=true;
-    }
-
-
-    public UniformHeader(int type, long id) {
-	this.type=type;
-	this.id=id;
-	handle=true;
-    }
-
-
-    public String toString() {return "[UNIFORM: type=" + type2Str(type) + ", id=" + id + "]";}
-}
-
-
-
-
-
-/**
-   The algorithms implements <em>dynamically-uniform failure-atomic group multicast</em>,
-   that is, a message is delivered by all members if it is delivered by at least 1
-   non-faulty member even if the sender crashes after sending. If the sender crashes, it
-   will eventually be removed from the group membership: the FLUSH protocol preceding the
-   view change causes all pending multicasts to be flushed out of the system, thereby
-   re-sending pending multicasts to members that haven't received them yet.<p>
-   The protocol makes use of <code>GroupRequest</code> (which itself uses 
-   <code>RequestCorrelator</code>) to send a request to all members and receive responses from
-   all non-faulty members.
- */
-
-public class UNIFORM extends Protocol implements RequestHandler, Transport {
-    Vector              members=null;
-    boolean             trace=false;
-    RequestCorrelator   corr=new RequestCorrelator(getName(), this, this);
-    Hashtable           pending=new Hashtable();    // key = sender, val = Hashtable (msg-id, msg)
-    Hashtable           delivered=new Hashtable();  // key = sender, val = Hashtable (msg-id, msg)
-
-
-    public String  getName() {return "UNIFORM";}
-
-
-    public boolean setProperties(Properties props) {super.setProperties(props);
-	String     str;
-
-	this.props=props;
-	str=props.getProperty("trace");
-	if(str != null) {
-	    trace=new Boolean(str).booleanValue();
-	    props.remove("trace");
-	}
-	if(props.size() > 0) {
-	    log.error("UNIFORM.setProperties(): the following properties are not recognized: " + props);
-	    
-	    return false;
-	}
-	return true;
-    }
-
-
-    /** Just remove if you don't need to reset any state */
-    public void reset() {}
-
-
-
-
-    public void up(Event evt) {
-	Message   msg;
-	boolean   rc;
-	Object    obj;
-
-	if(evt.getType() == Event.START) {
-	    corr.start();
-	    passUp(evt);
-	    return;
-	}
-	corr.receive(evt);	    
-    }
-
-
-
-
-    public void down(Event evt) {
-	Message       msg;
-	GroupRequest  save_req, deliver_req, seen_req, gc_req;
-	AndCommand    and_comm;
-	Message       save_msg, deliver_msg, seen_msg, gc_msg;
-	Vector        mbrs=null;
-	long          id=0;
-
-	switch(evt.getType()) {
-	case Event.STOP:
-	    corr.Stop();
-	    passDown(evt);
-	    break;
-
-	case Event.TMP_VIEW:
-	case Event.VIEW_CHANGE:
-	    Vector tmp;
-	    if((tmp=(Vector)((View)evt.getArg()).getMembers()) != null)
-		members=tmp;
-	    passDown(evt);
-	    break;
-
-	case Event.MSG:
-	    msg=(Message)evt.getArg();
-
-
-	    if(msg.getDest() != null) { // unicast msg
-		passDown(evt);
-		return;
-	    }
-
-	    id=System.currentTimeMillis();
-	    mbrs=(Vector)members.clone();
-
-
-
-	    /*
-	      1. Create 4 commands (SaveCommand, OkCommand, SeenCommand and GcCommand).
-	         Each has the same unique ID, and each is tagged with its type 
-		 (e.g. SAVE_REQ, OK_REQ etc). ID and type are contained in a UniformHeader
-		 attached to the message (with each command).
-	      2. Create an AndCommand and add the 4 commands.
-	      3. Add the AndCommand to a list of currently running commands and execute it.
-	      4. When a FLUSH request is received, wait until all commands are done.
-	     */
-
-	    save_msg=msg;
-	    save_msg.addHeader(new UniformHeader(UniformHeader.SAVE, id));
-	    save_req=new GroupRequest(save_msg, corr, mbrs, GroupRequest.GET_ALL);
-
-	    deliver_msg=new Message(null, null, null);
-	    deliver_msg.addHeader(new UniformHeader(UniformHeader.DELIVER, id));
-	    deliver_req=new GroupRequest(deliver_msg, corr, mbrs, GroupRequest.GET_ALL);
-
-	    seen_msg=new Message(null, null, null);
-	    seen_msg.addHeader(new UniformHeader(UniformHeader.SEEN, id));
-	    seen_req=new GroupRequest(seen_msg, corr, mbrs, GroupRequest.GET_ALL);
-
-	    gc_msg=new Message(null, null, null);
-	    gc_msg.addHeader(new UniformHeader(UniformHeader.GC, id));
-	    gc_req=new GroupRequest(gc_msg, corr, mbrs, GroupRequest.GET_ALL);
-
-	    and_comm=new AndCommand();
-	    and_comm.add(save_req); 
-	    and_comm.add(deliver_req);
-	    and_comm.add(seen_req);
-	    and_comm.add(gc_req);
-
-	    boolean rc=and_comm.execute();
-	    System.out.println("UNIFORM: rc from Execute is " + rc);
-
-
-	    break;
-
-	default:
-	    passDown(evt);  // Pass on to the layer below us
-	}
-
-    }
-
-
-
-
-    /**
-       <pre>
-       1. Remove UniformHeader from message and get ID and type
-       2. If type == SAVE: add message to save-table (key=sender + ID) and send response (SAVE_OK)
-          If type == OK:   add message to ok-table, remove from save-table. Deliver message (pass up
-	  the stack) and send response (OK_OK).
-	  If type == SEEN: find message in ok-table. If found, send SEEN_OK response, else NOT_SEEN.
-	  If type == GC: delete message from ok-table.
-       </pre>
-     */
-    public Object handle(Message msg) {
-	UniformHeader  hdr;
-	Object         obj=msg.peekHeader();
-	Message        m=null;
-
-	if(obj != null && obj instanceof UniformHeader) {
-	    hdr=(UniformHeader)msg.removeHeader();
-
-	    switch(hdr.type) {
-	    case UniformHeader.SAVE:
-
-		System.out.println("==> save in pending: " + msg.getSrc() + ":" + hdr.id);
-
-		saveInPending(hdr.id, msg);
-		return new Integer(UniformHeader.SAVE_OK);
-	    case UniformHeader.DELIVER:
-
-		System.out.println("==> move to delivered: " + msg.getSrc() + ":" + hdr.id);
-
-		m=moveFromPendingToDelivered(msg.getSrc(), hdr.id);
-		if(m != null)
-		    passUp(new Event(Event.MSG, m));
-		return new Integer(UniformHeader.DELIVER_OK);
-	    case UniformHeader.SEEN:
-		
-		System.out.print("==> find in delivered: " + msg.getSrc() + ":" + hdr.id);
-
-		if(findInDelivered(msg.getSrc(), hdr.id)) {
-		    System.out.println(" SEEN_OK");
-		    return new Integer(UniformHeader.SEEN_OK);
-		}
-
-		System.out.println(" SEEN_NOTOK");
-		return new Integer(UniformHeader.SEEN_NOTOK);
-
-	    case UniformHeader.GC:
-
-		System.out.println("==> remove from delivered: " + msg.getSrc() + ":" + hdr.id);
-
-		removeFromDelivered(msg.getSrc(), hdr.id);
-		return new Integer(UniformHeader.GC_OK);
-	    default:
-		log.error("UNIFORM.handle(): UniformHeader.type " + hdr.type + " not known");
-		break;
-	    }
-	}
-
-	return null;
-    }
-
-
-
-    /* --------------------------- Transport interface ---------------------------------- */
-    public void    send(Message msg) throws Exception {passDown(new Event(Event.MSG, msg));}
-    public Object  receive(long timeout) throws Exception {return null;}
-    /* ------------------------ End of Transport interface ------------------------------ */
-
-
-
-    void saveInPending(long msg_id, Message msg) {
-	Object     sender=msg.getSrc();
-	Long       key=new Long(msg_id);
-	Hashtable  val=(Hashtable)pending.get(sender); // look for sender as key
-
-	if(val == null) {
-	    val=new Hashtable();
-	    pending.put(sender, val);
-	}
-	if(!val.containsKey(key))
-	    val.put(key, msg);
-    }
-
-
-
-    Message moveFromPendingToDelivered(Object sender, long msg_id) {
-	Message    msg=null;
-	Hashtable  val_pending, val_delivered;
-	Long       key=new Long(msg_id);
-
-	val_pending=(Hashtable)pending.get(sender);
-	if(val_pending == null) {
-	    log.error("UNIFORM.moveFromPendingToDelivered(): value for " +
-			       sender + " not found !");
-	    return null;
-	}
-	msg=(Message)val_pending.get(key);
-	if(msg == null) {
-	    log.error("UNIFORM.moveFromPendingToDelivered(): value for " + sender + ":" +
-			       key + " not found !");
-	    return null;
-	}
-	
-	val_delivered=(Hashtable)delivered.get(sender);
-	if(val_delivered == null) {
-	    val_delivered=new Hashtable();
-	    delivered.put(sender, val_delivered);
-	}
-	if(!val_delivered.containsKey(key))
-	    val_delivered.put(key, msg);
-	val_pending.remove(key);               // remove from pending table
-	if(val_pending.size() == 0)
-	    pending.remove(sender);
-
-	return msg;
-    }
-
-
-
-
-    boolean findInDelivered(Object sender, long msg_id) {
-	Hashtable val=(Hashtable)delivered.get(sender);
-	if(val == null)
-	    return false;
-	return val.containsKey(new Long(msg_id));
-    }
-
-
-
-    void removeFromDelivered(Object sender, long msg_id) {
-	Hashtable val=(Hashtable)delivered.get(sender);
-	if(val == null)
-	    return;
-	val.remove(new Long(msg_id));
-	if(val.size() == 0)
-	    delivered.remove(sender);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/package.html
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/package.html b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/package.html
deleted file mode 100644
index 8aeeb29..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/package.html
+++ /dev/null
@@ -1,6 +0,0 @@
-<HTML>
-	<BODY>
-		Provides implementations of transport protocols which are 
-		responsible for sending and receiving messages to/from the network. 
-	</BODY>
-</HTML>
\ No newline at end of file


Mime
View raw message