geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bschucha...@apache.org
Subject [21/51] [partial] incubator-geode git commit: GEODE-77 removing the old jgroups subproject
Date Fri, 21 Aug 2015 21:22:45 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/STATS.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/STATS.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/STATS.java
deleted file mode 100644
index 35dccca..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/STATS.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/** Notice of modification as required by the LGPL
- *  This file was modified by Gemstone Systems Inc. on
- *  $Date$
- **/
-package com.gemstone.org.jgroups.protocols;
-
-
-import com.gemstone.org.jgroups.Address;
-import com.gemstone.org.jgroups.Event;
-import com.gemstone.org.jgroups.Message;
-import com.gemstone.org.jgroups.View;
-import com.gemstone.org.jgroups.stack.Protocol;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-
-import java.util.*;
-
-/**
- * Provides various stats
- * @author Bela Ban
- * @version $Id: STATS.java,v 1.2 2005/06/07 10:17:27 belaban Exp $
- */
-public class STATS extends Protocol  {
-    long sent_msgs, sent_bytes, sent_ucasts, sent_mcasts, received_ucasts, received_mcasts;
-    long received_msgs, received_bytes, sent_ucast_bytes, sent_mcast_bytes, received_ucast_bytes, received_mcast_bytes;
-
-    /** HashMap key=Address, value=Entry, maintains stats per target destination */
-    HashMap sent=new HashMap();
-
-    /** HashMap key=Address, value=Entry, maintains stats per receiver */
-    HashMap received=new HashMap();
-
-    static/*GemStoneAddition*/ final short UP=1;
-    static/*GemStoneAddition*/ final short DOWN=2;
-
-
-    @Override // GemStoneAddition  
-    public String getName() {
-        return "STATS";
-    }
-
-    @Override // GemStoneAddition  
-    public boolean setProperties(Properties props) {
-        super.setProperties(props);
-        down_thread=false; // never use a down thread
-        up_thread=false;   // never use an up thread
-
-        if(props.size() > 0) {
-            log.error(ExternalStrings.STATS_THE_FOLLOWING_PROPERTIES_ARE_NOT_RECOGNIZED__0, props);
-            return false;
-        }
-        return true;
-    }
-
-    @Override // GemStoneAddition  
-    public void resetStats() {
-        sent_msgs=sent_bytes=sent_ucasts=sent_mcasts=received_ucasts=received_mcasts=0;
-        received_msgs=received_bytes=sent_ucast_bytes=sent_mcast_bytes=received_ucast_bytes=received_mcast_bytes=0;
-        sent.clear();
-        received.clear();
-    }
-
-
-    public long getSentMessages() {return sent_msgs;}
-    public long getSentBytes() {return sent_bytes;}
-    public long getSentUnicastMessages() {return sent_ucasts;}
-    public long getSentUnicastBytes() {return sent_ucast_bytes;}
-    public long getSentMcastMessages() {return sent_mcasts;}
-    public long getSentMcastBytes() {return sent_mcast_bytes;}
-
-    public long getReceivedMessages() {return received_msgs;}
-    public long getReceivedBytes() {return received_bytes;}
-    public long getReceivedUnicastMessages() {return received_ucasts;}
-    public long getReceivedUnicastBytes() {return received_ucast_bytes;}
-    public long getReceivedMcastMessages() {return received_mcasts;}
-    public long getReceivedMcastBytes() {return received_mcast_bytes;}
-
-
-    @Override // GemStoneAddition  
-    public void up(Event evt) {
-        if(evt.getType() == Event.MSG) {
-            Message msg=(Message)evt.getArg();
-            updateStats(msg, UP);
-        }
-        else if(evt.getType() == Event.VIEW_CHANGE) {
-            handleViewChange((View)evt.getArg());
-        }
-        passUp(evt);
-    }
-
-
-
-    @Override // GemStoneAddition  
-    public void down(Event evt) {
-        if(evt.getType() == Event.MSG) {
-            Message msg=(Message)evt.getArg();
-            updateStats(msg, DOWN);
-        }
-        else if(evt.getType() == Event.VIEW_CHANGE) {
-            handleViewChange((View)evt.getArg());
-        }
-        passDown(evt);
-    }
-
-
-    @Override // GemStoneAddition  
-    public String printStats() {
-        Map.Entry entry;
-        Object key, val;
-        StringBuffer sb=new StringBuffer();
-        sb.append("sent:\n");
-        for(Iterator it=sent.entrySet().iterator(); it.hasNext();) {
-            entry=(Map.Entry)it.next();
-            key=entry.getKey();
-            if(key == null) key="<mcast dest>";
-            val=entry.getValue();
-            sb.append(key).append(": ").append(val).append("\n");
-        }
-        sb.append("\nreceived:\n");
-        for(Iterator it=received.entrySet().iterator(); it.hasNext();) {
-            entry=(Map.Entry)it.next();
-            key=entry.getKey();
-            val=entry.getValue();
-            sb.append(key).append(": ").append(val).append("\n");
-        }
-
-        return sb.toString();
-    }
-
-    private void handleViewChange(View view) {
-        Vector members=view.getMembers();
-        Set tmp=new LinkedHashSet(members);
-        tmp.add(null); // for null destination (= mcast)
-        sent.keySet().retainAll(tmp);
-        received.keySet().retainAll(tmp);
-    }
-
-    private void updateStats(Message msg, short direction) {
-        int     length;
-        HashMap map;
-        boolean mcast;
-        Address dest, src;
-
-        if(msg == null) return;
-        length=msg.getLength();
-        dest=msg.getDest();
-        src=msg.getSrc();
-        mcast=dest == null || dest.isMulticastAddress();
-
-        if(direction == UP) { // received
-            received_msgs++;
-            received_bytes+=length;
-            if(mcast) {
-                received_mcasts++;
-                received_mcast_bytes+=length;
-            }
-            else {
-                received_ucasts++;
-                received_ucast_bytes+=length;
-            }
-        }
-        else {                // sent
-            sent_msgs++;
-            sent_bytes+=length;
-            if(mcast) {
-                sent_mcasts++;
-                sent_mcast_bytes+=length;
-            }
-            else {
-                sent_ucasts++;
-                sent_ucast_bytes+=length;
-            }
-        }
-
-        Address key=direction == UP? src : dest;
-        map=direction == UP? received : sent;
-        Entry entry=(Entry)map.get(key);
-        if(entry == null) {
-            entry=new Entry();
-            map.put(key, entry);
-        }
-        entry.msgs++;
-        entry.bytes+=length;
-        if(mcast) {
-            entry.mcasts++;
-            entry.mcast_bytes+=length;
-        }
-        else {
-            entry.ucasts++;
-            entry.ucast_bytes+=length;
-        }
-    }
-
-
-
-
-    static class Entry  {
-        long msgs, bytes, ucasts, mcasts, ucast_bytes, mcast_bytes;
-
-        @Override // GemStoneAddition  
-        public String toString() {
-            StringBuffer sb=new StringBuffer();
-            sb.append(msgs).append(" (").append(bytes).append(" bytes)");
-            sb.append(": ").append(ucasts).append(" ucasts (").append(ucast_bytes).append(" bytes), ");
-            sb.append(mcasts).append(" mcasts (").append(mcast_bytes).append(" bytes)");
-            return sb.toString();
-        }
-    }
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCP.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCP.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCP.java
deleted file mode 100644
index cf50d36..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCP.java
+++ /dev/null
@@ -1,308 +0,0 @@
-/** Notice of modification as required by the LGPL
- *  This file was modified by Gemstone Systems Inc. on
- *  $Date$
- **/
-// $Id: TCP.java,v 1.31 2005/09/29 12:24:37 belaban Exp $
-
-package com.gemstone.org.jgroups.protocols;
-
-
-
-import com.gemstone.org.jgroups.Address;
-import com.gemstone.org.jgroups.Event;
-import com.gemstone.org.jgroups.Message;
-import com.gemstone.org.jgroups.SuspectMember;
-import com.gemstone.org.jgroups.blocks.ConnectionTable;
-import com.gemstone.org.jgroups.stack.IpAddress;
-import com.gemstone.org.jgroups.util.BoundedList;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.Properties;
-import java.util.Vector;
-
-
-
-
-/**
- * TCP based protocol. Creates a server socket, which gives us the local address of this group member. For
- * each accept() on the server socket, a new thread is created that listens on the socket.
- * For each outgoing message m, if m.dest is in the ougoing hashtable, the associated socket will be reused
- * to send message, otherwise a new socket is created and put in the hashtable.
- * When a socket connection breaks or a member is removed from the group, the corresponding items in the
- * incoming and outgoing hashtables will be removed as well.<br>
- * This functionality is in ConnectionTable, which isT used by TCP. TCP sends messages using ct.send() and
- * registers with the connection table to receive all incoming messages.
- * @author Bela Ban
- */
-public class TCP extends TP implements ConnectionTable.Receiver {
-    private ConnectionTable ct=null;
-    private InetAddress	    external_addr=null; // the IP address which is broadcast to other group members
-    private int             start_port=7800;    // find first available port starting at this port
-    private int	            end_port=0;         // maximum port to bind to
-    private long            reaper_interval=0;  // time in msecs between connection reaps
-    private long            conn_expire_time=0; // max time a conn can be idle before being reaped
-
-    /** List the maintains the currently suspected members. This is used so we don't send too many SUSPECT
-     * events up the stack (one per message !)
-     */
-    final BoundedList      suspected_mbrs=new BoundedList(20);
-
-    /** Should we drop unicast messages to suspected members or not */
-    boolean                skip_suspected_members=true;
-
-    /** Use separate send queues for each connection */
-    boolean                use_send_queues=true;
-
-    int                    recv_buf_size=150000;
-    int                    send_buf_size=150000;
-    int                    sock_conn_timeout=2000; // max time in millis for a socket creation in ConnectionTable
-
-
-
-    public TCP() {
-    }
-
-    @Override // GemStoneAddition
-    public String getName() {
-        return "TCP";
-    }
-
-
-    public int getOpenConnections()      {return ct.getNumConnections();}
-    public InetAddress getBindAddr() {return bind_addr;}
-    public void setBindAddr(InetAddress bind_addr) {this.bind_addr=bind_addr;}
-    public int getStartPort() {return start_port;}
-    public void setStartPort(int start_port) {this.start_port=start_port;}
-    public int getEndPort() {return end_port;}
-    public void setEndPort(int end_port) {this.end_port=end_port;}
-    public long getReaperInterval() {return reaper_interval;}
-    public void setReaperInterval(long reaper_interval) {this.reaper_interval=reaper_interval;}
-    public long getConnExpireTime() {return conn_expire_time;}
-    public void setConnExpireTime(long conn_expire_time) {this.conn_expire_time=conn_expire_time;}
-    @Override // GemStoneAddition
-    public boolean isLoopback() {return loopback;}
-    @Override // GemStoneAddition
-    public void setLoopback(boolean loopback) {this.loopback=loopback;}
-
-
-    public String printConnections()     {return ct.toString();}
-
-
-    /** Setup the Protocol instance acording to the configuration string */
-    @Override // GemStoneAddition
-    public boolean setProperties(Properties props) {
-        String str;
-
-        super.setProperties(props);
-        str=props.getProperty("start_port");
-        if(str != null) {
-            start_port=Integer.parseInt(str);
-            props.remove("start_port");
-        }
-
-        str=props.getProperty("end_port");
-        if(str != null) {
-            end_port=Integer.parseInt(str);
-            props.remove("end_port");
-        }
-
-        str=props.getProperty("external_addr");
-        if(str != null) {
-            try {
-                external_addr=InetAddress.getByName(str);
-            }
-            catch(UnknownHostException unknown) {
-                if(log.isFatalEnabled()) log.fatal("(external_addr): host " + str + " not known");
-                return false;
-            }
-            props.remove("external_addr");
-        }
-
-        str=props.getProperty("reaper_interval");
-        if(str != null) {
-            reaper_interval=Long.parseLong(str);
-            props.remove("reaper_interval");
-        }
-
-        str=props.getProperty("conn_expire_time");
-        if(str != null) {
-            conn_expire_time=Long.parseLong(str);
-            props.remove("conn_expire_time");
-        }
-
-        str=props.getProperty("sock_conn_timeout");
-        if(str != null) {
-            sock_conn_timeout=Integer.parseInt(str);
-            props.remove("sock_conn_timeout");
-        }
-
-        str=props.getProperty("recv_buf_size");
-        if(str != null) {
-            recv_buf_size=Integer.parseInt(str);
-            props.remove("recv_buf_size");
-        }
-
-        str=props.getProperty("send_buf_size");
-        if(str != null) {
-            send_buf_size=Integer.parseInt(str);
-            props.remove("send_buf_size");
-        }
-
-        str=props.getProperty("skip_suspected_members");
-        if(str != null) {
-            skip_suspected_members=Boolean.valueOf(str).booleanValue();
-            props.remove("skip_suspected_members");
-        }
-
-        str=props.getProperty("use_send_queues");
-        if(str != null) {
-            use_send_queues=Boolean.valueOf(str).booleanValue();
-            props.remove("use_send_queues");
-        }
-
-        if(props.size() > 0) {
-            log.error(ExternalStrings.TCP_THE_FOLLOWING_PROPERTIES_ARE_NOT_RECOGNIZED__0, props);
-            return false;
-        }
-        return true;
-    }
-
-
-    @Override // GemStoneAddition
-    public void start() throws Exception {
-        ct=getConnectionTable(reaper_interval,conn_expire_time,bind_addr,external_addr,start_port,end_port);
-        ct.setUseSendQueues(use_send_queues);
-        // ct.addConnectionListener(this);
-        ct.setReceiveBufferSize(recv_buf_size);
-        ct.setSendBufferSize(send_buf_size);
-        ct.setSocketConnectionTimeout(sock_conn_timeout);
-        local_addr=ct.getLocalAddress();
-        if(additional_data != null && local_addr instanceof IpAddress)
-            ((IpAddress)local_addr).setAdditionalData(additional_data);
-        super.start();
-    }
-
-    @Override // GemStoneAddition
-    public void stop() {
-        ct.stop();
-        super.stop();
-    }
-
-
-    @Override // GemStoneAddition
-    protected void handleDownEvent(Event evt) {
-        super.handleDownEvent(evt);
-        if(evt.getType() == Event.VIEW_CHANGE) {
-            suspected_mbrs.removeAll();
-        }
-        else if(evt.getType() == Event.UNSUSPECT) {
-            suspected_mbrs.removeElement(evt.getArg());
-        }
-    }
-    
-
-   /**
-    * @param reaperInterval
-    * @param connExpireTime
-    * @param bindAddress
-    * @param startPort
-    * @throws Exception
-    * @return ConnectionTable
-    * Sub classes overrides this method to initialize a different version of
-    * ConnectionTable.
-    */
-   protected ConnectionTable getConnectionTable(long reaperInterval, long connExpireTime, InetAddress bindAddress,
-                                                InetAddress externalAddress, int startPort, int endPort) throws Exception {
-       ConnectionTable cTable;
-       if(reaperInterval == 0 && connExpireTime == 0) {
-           cTable=new ConnectionTable(this, bindAddress, externalAddress, startPort, endPort);
-       }
-       else {
-           if(reaperInterval == 0) {
-               reaperInterval=5000;
-               if(warn) log.warn("reaper_interval was 0, set it to " + reaperInterval);
-           }
-           if(connExpireTime == 0) {
-               connExpireTime=1000 * 60 * 5;
-               if(warn) log.warn("conn_expire_time was 0, set it to " + connExpireTime);
-           }
-           cTable=new ConnectionTable(this, bindAddress, externalAddress, startPort, endPort, 
-                                      reaperInterval, connExpireTime);
-       }
-       return cTable;
-   }
-
-
-    /** ConnectionTable.Receiver interface */
-    public void receive(Address sender, byte[] data, int offset, int length) {
-        super.receive(local_addr, sender, data, offset, length);
-    }
-
-
-
-
-    @Override // GemStoneAddition
-    public void sendToAllMembers(byte[] data, int offset, int length) throws Exception {
-        Address dest;
-        Vector mbrs=(Vector)members.clone();
-        for(int i=0; i < mbrs.size(); i++) {
-            dest=(Address)mbrs.elementAt(i);
-            sendToSingleMember(dest, false, data, offset, length);
-        }
-    }
-
-    @Override // GemStoneAddition
-    public void sendToSingleMember(Address dest, boolean isJoinResponse/*temporary change - do not commit*/, byte[] data, int offset, int length) throws Exception {
-        if(trace) log.trace("dest=" + dest + " (" + data.length + " bytes)");
-        if(skip_suspected_members) {
-            if(suspected_mbrs.contains(dest)) {
-                if(trace)
-                    log.trace("will not send unicast message to " + dest + " as it is currently suspected");
-                return;
-            }
-        }
-
-//        if(dest.equals(local_addr)) {
-//            if(!loopback) // if loopback, we discard the message (was already looped back)
-//                receive(dest, data, offset, length); // else we loop it back here
-//            return;
-//        }
-        try {
-            ct.send(dest, data, offset, length);
-        }
-        catch(Exception e) {
-            if(members.contains(dest)) {
-                if(!suspected_mbrs.contains(dest)) {
-                    suspected_mbrs.add(dest);
-                    passUp(new Event(Event.SUSPECT, new SuspectMember(local_addr, dest))); // GemStoneAddition SuspectMember
-                }
-            }
-        }
-    }
-
-
-    @Override // GemStoneAddition
-    public String getInfo() {
-        StringBuffer sb=new StringBuffer();
-        sb.append("connections: ").append(printConnections()).append("\n");
-        return sb.toString();
-    }
-
-
-    @Override // GemStoneAddition
-    public void postUnmarshalling(Message msg, Address dest, Address src, boolean multicast) {
-        if(multicast)
-            msg.setDest(null);
-        else
-            msg.setDest(dest);
-    }
-
-    @Override // GemStoneAddition
-    public void postUnmarshallingList(Message msg, Address dest, boolean multicast) {
-        postUnmarshalling(msg, dest, null, multicast);
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCPGOSSIP.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCPGOSSIP.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCPGOSSIP.java
deleted file mode 100644
index 91650af..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCPGOSSIP.java
+++ /dev/null
@@ -1,429 +0,0 @@
-/** Notice of modification as required by the LGPL
- *  This file was modified by Gemstone Systems Inc. on
- *  $Date$
- **/
-// $Id: TCPGOSSIP.java,v 1.16 2005/08/11 12:43:47 belaban Exp $
-
-package com.gemstone.org.jgroups.protocols;
-
-
-import java.net.Inet4Address;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.Properties;
-import java.util.Set;
-import java.util.StringTokenizer;
-import java.util.Vector;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.gemstone.org.jgroups.Address;
-import com.gemstone.org.jgroups.Event;
-import com.gemstone.org.jgroups.JChannel;
-import com.gemstone.org.jgroups.Message;
-import com.gemstone.org.jgroups.stack.GossipClient;
-import com.gemstone.org.jgroups.stack.IpAddress;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-
-
-/**
- * The TCPGOSSIP protocol layer retrieves the initial membership (used by the GMS when started
- * by sending event FIND_INITIAL_MBRS down the stack).
- * We do this by contacting one or more GossipServers, which must be running at well-known
- * addresses:ports. The responses should allow us to determine the coordinator whom we have to
- * contact, e.g. in case we want to join the group.  When we are a server (after having
- * received the BECOME_SERVER event), we'll respond to TCPGOSSIP requests with a TCPGOSSIP
- * response.<p> The FIND_INITIAL_MBRS event will eventually be answered with a
- * FIND_INITIAL_MBRS_OK event up the stack.
- *
- * @author Bela Ban
- */
-public class TCPGOSSIP extends Discovery  {
-    Vector initial_hosts=null;  // (list of IpAddresses) hosts to be contacted for the initial membership
-    GossipClient gossip_client=null;  // accesses the GossipServer(s) to find initial mbrship
-
-    // we need to refresh the registration with the GossipServer(s) periodically,
-    // so that our entries are not purged from the cache
-    long gossip_refresh_rate=20000;
-    
-    private boolean splitBrainDetectionEnabled; // GemStoneAddition
-    private int gossipServerWaitTime; // GemStoneAddition
-
-    final static Vector EMPTY_VECTOR=new Vector();
-    final static String name="TCPGOSSIP";
-
-
-    @Override // GemStoneAddition
-    public String getName() {
-        return name;
-    }
-
-    // start GemStoneAddition
-    @Override // GemStoneAddition
-    public int getProtocolEnum() {
-      return com.gemstone.org.jgroups.stack.Protocol.enumTCPGOSSIP;
-    }
-    // end GemStone addition
-
-    @Override // GemStoneAddition
-    public boolean setProperties(Properties props) {
-        String str;
-        str=props.getProperty("gossip_refresh_rate");  // wait for at most n members
-        if(str != null) {
-            gossip_refresh_rate=Integer.parseInt(str);
-            props.remove("gossip_refresh_rate");
-        }
-
-        //GemStoneAddition - split-brain detection support
-        str=props.getProperty("split-brain-detection");
-        if (str != null) {
-          splitBrainDetectionEnabled = Boolean.valueOf(str).booleanValue();
-          props.remove("split-brain-detection");
-        }
-
-        str=props.getProperty("initial_hosts");
-        if(str != null) {
-            props.remove("initial_hosts");
-            initial_hosts=createInitialHosts(str);
-        }
-        
-        str = props.getProperty("gossip_server_wait_time");
-        if (str != null) {
-          props.remove("gossip_server_wait_time");
-          gossipServerWaitTime = Integer.parseInt(str);
-        }
-
-        if(initial_hosts == null || initial_hosts.size() == 0) {
-            if(log.isErrorEnabled()) log.error(ExternalStrings.TCPGOSSIP_INITIAL_HOSTS_MUST_CONTAIN_THE_ADDRESS_OF_AT_LEAST_ONE_GOSSIPSERVER);
-            return false;
-        }
-        return super.setProperties(props);
-    }
-
-
-
-    @Override // GemStoneAddition
-    public void start() throws Exception {
-        super.start();
-        if(gossip_client == null) {
-            gossip_client=new GossipClient(initial_hosts, gossip_refresh_rate, this.stack);
-            gossip_client.setTimeout((int)this.timeout);
-        }
-    }
-
-    @Override // GemStoneAddition
-    public void stop() {
-        super.stop();
-        if(gossip_client != null) {
-            gossip_client.stop();
-            //gossip_client=null;
-        }
-    }
-
-
-    @Override // GemStoneAddition
-    public void handleConnectOK() {
-        if(group_addr == null || local_addr == null) {
-            if(log.isErrorEnabled())
-                log.error("[CONNECT_OK]: group_addr or local_addr is null. " +
-                          "cannot register with GossipServer(s)");
-        }
-        else {
-            gossip_client.register(group_addr, local_addr, timeout, true);  // GemStone - timeout, stack & inhibit registration
-        }
-    }
-
-
-    private boolean ipWarningIssued; // GemStoneAddition - IP version checking
-
-    @Override // GemStoneAddition
-    public void sendGetMembersRequest(AtomicBoolean waiter_sync) { // GemStoneAddition - both parameters
-        Message msg, copy;
-        PingHeader hdr;
-        Vector tmp_mbrs;
-        Address mbr_addr;
-        GossipClient client = gossip_client; // GemStoneAddition - gossip_client gets nulled when this proto is stopped
-
-        // bug #41484 - only use coordinator advice from the gossip server once
-        boolean shortcutOK = !this.stack.hasTriedJoinShortcut();
-
-        if(group_addr == null) {
-            if(log.isErrorEnabled()) log.error(ExternalStrings.TCPGOSSIP_FIND_INITIAL_MBRS_GROUP_ADDR_IS_NULL_CANNOT_GET_MBRSHIP);
-            passUp(new Event(Event.FIND_INITIAL_MBRS_OK, EMPTY_VECTOR));
-            return;
-        }
-        if(trace) log.trace("fetching members from GossipServer(s)");
-
-        // GemStoneAddition - bug 28965: don't allow startup if no gossip server
-        boolean isAdminOnly = stack.gfPeerFunctions.isAdminOnlyMember();
-        //do {  GemStone - see comment below
-        
-          if (gossip_client == null)
-            return;
-
-          long giveUpTime = System.currentTimeMillis() + (this.gossipServerWaitTime * 1000L);
-
-          tmp_mbrs=client.getMembers(group_addr, local_addr, true, this.timeout); // GemStoneAddition - send local addr on get
-          
-          boolean firstWait = true;
-          boolean startupStatusWaitingSet = false;
-          
-//          if (isAdminOnly) { // GemStoneAddition - this if-else block added
-            while (gossip_client != null && client.getResponsiveServerCount() == 0 ||  tmp_mbrs == null || tmp_mbrs.size() == 0) {
-              // Wait, until we can contact at least one of our
-              // gossip servers and it had someone register with it
-              if (!isAdminOnly  &&  System.currentTimeMillis() >= giveUpTime) {
-                break;
-              }
-              if (firstWait) {
-                StringBuilder sb = new StringBuilder(100);
-                for (Object obj: this.initial_hosts) {
-                  if (!firstWait) {
-                    sb.append(',');
-                  }
-                  firstWait = false;
-                  IpAddress addr = (IpAddress)obj;
-                  sb.append(addr.getIpAddress().getHostName())
-                    .append('[')
-                    .append(addr.getPort())
-                    .append(']');
-                }
-                // inform gfsh / ServerLauncher
-                startupStatusWaitingSet = true;
-                stack.gfPeerFunctions.logStartup(ExternalStrings.WAITING_FOR_LOCATOR_TO_START,sb.toString());
-              }
-              try {
-                Thread.sleep(1000);
-              } catch (InterruptedException ignore) {
-                Thread.currentThread().interrupt(); // GemStoneAddition
-                return; // GemStoneAddition
-              }
-              tmp_mbrs=client.getMembers(group_addr, local_addr, true, timeout);
-              // GemStone Addition 08-04-04
-              // if the VM is exiting, return so that the distributed system
-              // sync can be released and the shutdown hook can do its job
-              if (stack.gfPeerFunctions.shutdownHookIsAlive()) {
-                throw stack.gfBasicFunctions.getGemFireConfigException("Unable to contact a Locator service before detecting that VM is exiting");
-              }
-            }
-//          } else {
-//            if (gossip_client == null) GemStoneAddition (this is never null)
-//              return;
-            if (client.getResponsiveServerCount() == 0) {
-              RuntimeException re = stack.gfBasicFunctions.getGemFireConfigException("Unable to contact a Locator service.  Operation either timed out or Locator does not exist.  Configured list of locators is \"" + initial_hosts + "\".");
-              throw re;
-            }
-//          }
-
-            if (startupStatusWaitingSet) {
-              stack.gfPeerFunctions.logStartup(ExternalStrings.WAITING_FOR_LOCATOR_TO_START_COMPLETED);
-            }
-        Set<Address> serverAddresses = client.getServerAddresses();
-        
-        if (client.getFloatingCoordinatorDisabled()) {
-          passUp(new Event(Event.FLOATING_COORDINATOR_DISABLED, null));
-        }
-        
-        if (client.getNetworkPartitionDetectionEnabled() != splitBrainDetectionEnabled) {
-          if (!splitBrainDetectionEnabled) {
-            splitBrainDetectionEnabled = true;
-            passUp(new Event(Event.ENABLE_NETWORK_PARTITION_DETECTION));
-          } else {
-            throw stack.gfBasicFunctions.getGemFireConfigException("Locator has enable-network-partition-detection="
-              + client.getNetworkPartitionDetectionEnabled()
-              +" but this member has enable-network-partition-detection="
-              + splitBrainDetectionEnabled);
-          }
-        }
-        
-        if (client.getNetworkPartitionDetectionEnabled()) {
-          stack.gfBasicFunctions.checkDisableDNS();
-        }
-          
-
-        // GemStoneAddition for bug 39220 see if we're using an incompatible
-        // version of IP
-        if (tmp_mbrs != null && !ipWarningIssued) {
-          TP protocol = (TP)stack.findProtocol("UDP");
-          if (protocol == null) protocol = (TP)stack.findProtocol("TCP");
-          InetAddress bindAddress = protocol.getInetBindAddress();
-          if (bindAddress != null) {
-            boolean iAmIPv4 = (bindAddress instanceof Inet4Address);
-            for (int i=0; i<tmp_mbrs.size(); i++) {
-              IpAddress addr = (IpAddress)tmp_mbrs.get(i);
-              InetAddress iaddr = addr.getIpAddress();
-              if (iAmIPv4 != (iaddr instanceof Inet4Address)) {
-                // incompatible addresses are being used
-                log.getLogWriter().warning(
-                  ExternalStrings.TCPGOSSIP_IP_VERSION_MISMATCH);
-                ipWarningIssued = true;
-                break;
-              }
-            }
-          }
-        }
-        
-        serverAddresses.remove(this.local_addr);
-        this.ping_waiter.setRequiredResponses(serverAddresses);
-
-        // GemStoneAddition - if no locators have distributed systems,
-        // tell the GMS that it's okay for it to become a coordinator  
-        if (client.getServerDistributedSystemCount() == 0) {
-          passUp(new Event(Event.ENABLE_INITIAL_COORDINATOR, null));
-        }
-        
-        // GemStoneAddition - shortcut the get_mbrs phase
-        if (shortcutOK) {
-          Address coordinator = client.getCoordinator();
-          // if this is a Locator starting up and there are no other processes
-          // in the system we can bypass discovery
-
-          // disabled: this allows a locator that's starting up to ignore concurrently
-          // starting locators.  bug #30341 is fixed by requiring responses from
-          // all known locators during discovery, and this code messes that up
-//          if (coordinator == null && Locator.hasLocators()
-//              && tmp_mbrs.size() == 0
-//              || (tmp_mbrs.size() == 1 && tmp_mbrs.get(0).equals(this.local_addr))) {
-//            coordinator = this.local_addr;
-//          }
-          if (coordinator != null) {
-            if (log.getLogWriter().fineEnabled()) {
-              log.getLogWriter().fine("Locator returned coordinator " + coordinator +
-              ", so bypassing unicast discovery processing");
-            }
-            ping_waiter.setCoordinator(coordinator);
-            wakeWaiter(waiter_sync);
-            return;
-          }
-        }
-          
-        if(tmp_mbrs == null || tmp_mbrs.size() == 0) {
-            if(trace) log.trace("[FIND_INITIAL_MBRS]: gossip client found no members");
-            passUp(new Event(Event.FIND_INITIAL_MBRS_OK, EMPTY_VECTOR));
-            wakeWaiter(waiter_sync); // GemStoneAddition
-            return;
-        }
-        if(trace) {
-          log.trace("consolidated mbrs from GossipServer(s) are " + tmp_mbrs
-              + ".  Locator distributed system count=" + client.getServerDistributedSystemCount()
-              + ", and floatingCoordinationDisabled="+client.getFloatingCoordinatorDisabled());
-        }
-
-        // GemStoneAddition - forces us to not get any initial member responses & tests the
-        // disable_initial_coordinator setting
-        //if (true) {
-        //  log.info("DEBUG: not sending GET_MBRS_REQ message to list returned by gossip server");
-        //  return;
-        //}
-
-        // 1. 'Mcast' GET_MBRS_REQ message
-        hdr=new PingHeader(PingHeader.GET_MBRS_REQ, null);
-        msg=new Message(null, null, null);
-        msg.putHeader(name, hdr);
-        //GemStoneAddition - don't bundle this message or we might time out
-        // before it's even sent
-        msg.bundleable = false;
-
-        wakeWaiter(waiter_sync); // GemStoneAddition
-        
-        // GemStoneAddition - here we send the request to newer members first
-        // since they're likely to be around.
-        int max_msgs = Integer.getInteger("gemfire.max_ping_requests", 40).intValue();
-        int msgs_sent = 0;
-        for(int i=tmp_mbrs.size()-1; i >= 0; i--) {
-            mbr_addr=(Address)tmp_mbrs.elementAt(i);
-            // make sure all required responders get the message
-            if (!serverAddresses.contains(mbr_addr)  &&  (msgs_sent >= max_msgs)) {
-              continue;
-            }
-            copy=msg.copy();
-            copy.setDest(mbr_addr);
-            if(trace) log.trace("[FIND_INITIAL_MBRS] sending PING request to " + copy.getDest());
-            passDown(new Event(Event.MSG, copy));
-            if (Thread.currentThread().isInterrupted()) {
-              break;
-            }
-            msgs_sent++;
-        }
-          
-          // GemStoneAddition - not really an addition, just a note from Bruce
-          // that this used to have a wait-for-initial-members section that is
-          // now gone, making the loop a bit difficult to implement
-        //} while (isAdminOnly && initial_members.size() <= 0);
-        
-    }
-
-
-
-    /* -------------------------- Private methods ---------------------------- */
-
-
-    /**
-     * Input is "daddy[8880],sindhu[8880],camille[5555]. Return list of IpAddresses
-     */
-    public static Vector createInitialHosts(String l) {
-        Vector tmp=new Vector();
-        String host;
-        int port;
-        IpAddress addr;
-        StringTokenizer tok=new StringTokenizer(l, ",");
-        String t;
-        boolean isLoopback = false;
-        InetAddress myAddress = null;
-        
-        String bindAddress = System.getProperty("gemfire.jg-bind-address");
-        try {
-          if (bindAddress == null) {
-            isLoopback = JChannel.getGfFunctions().getLocalHost().isLoopbackAddress(); 
-          } else {
-            isLoopback = InetAddress.getByName(bindAddress).isLoopbackAddress();
-          }
-        } catch (UnknownHostException e) {
-          // ignore
-        }
-        
-
-
-        while(tok.hasMoreTokens()) {
-            try {
-                t=tok.nextToken();
-                host=t.substring(0, t.indexOf('['));
-                // GemStoneAddition - support for name:bind-addr[port] format
-                int idx = host.lastIndexOf('@');
-                if (idx < 0) {
-                  idx = host.lastIndexOf(':');
-                }
-                String h = host.substring(0, idx > -1 ? idx : host.length());
-                if (h.indexOf(':') >= 0) { // a single numeric ipv6 address
-                  idx = host.lastIndexOf('@');
-                }
-                if (idx >= 0) {
-                  host = host.substring(idx+1, host.length());
-                }
-                port=Integer.parseInt(t.substring(t.indexOf('[') + 1, t.indexOf(']')));
-                addr=new IpAddress(host, port);
-                if (isLoopback && !addr.getIpAddress().isLoopbackAddress()) { // GemStoneAddition
-                  // TODO this should be a GemFireConfigException but that class isn't available
-                  // in a static method in the jgroups project
-                  throw new RuntimeException("This process is attempting to join with a loopback address ("+myAddress+") using a locator that does not have a local address ("+addr.getIpAddress()+").  On Unix this usually means that /etc/hosts is misconfigured.");
-                }
-                tmp.addElement(addr);
-            }
-            catch(NumberFormatException e) {
-                //if(log.isErrorEnabled()) log.error(JGroupsStrings.TCPGOSSIP_EXEPTION_IS__0, e);
-            }
-        }
-
-        return tmp;
-    }
-    
-    @Override // GemStoneAddition
-    public void destroy() { // GemStoneAddition - get rid of gossip timer
-      if (gossip_client != null) {
-        gossip_client.destroy();
-        gossip_client = null;
-      }
-    }
-
-
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCPPING.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCPPING.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCPPING.java
deleted file mode 100644
index 6459d49..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCPPING.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/** Notice of modification as required by the LGPL
- *  This file was modified by Gemstone Systems Inc. on
- *  $Date$
- **/
-// $Id: TCPPING.java,v 1.24 2005/08/11 12:43:47 belaban Exp $
-
-package com.gemstone.org.jgroups.protocols;
-
-
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Properties;
-import java.util.StringTokenizer;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.gemstone.org.jgroups.Address;
-import com.gemstone.org.jgroups.Event;
-import com.gemstone.org.jgroups.Message;
-import com.gemstone.org.jgroups.stack.IpAddress;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-
-
-/**
- * The TCPPING protocol layer retrieves the initial membership in answer to the GMS's
- * FIND_INITIAL_MBRS event. The initial membership is retrieved by directly contacting other group
- * members, sending point-to-point mebership requests. The responses should allow us to determine
- * the coordinator whom we have to contact in case we want to join the group. When we are a server
- * (after having received the BECOME_SERVER event), we'll respond to TCPPING requests with a TCPPING
- * response.
- * <p>
- * The FIND_INITIAL_MBRS event will eventually be answered with a FIND_INITIAL_MBRS_OK event up
- * the stack.
- * <p>
- * The TCPPING protocol requires a static conifiguration, which assumes that you to know in advance
- * where to find other members of your group. For dynamic discovery, use the PING protocol, which
- * uses multicast discovery, or the TCPGOSSIP protocol, which contacts a Gossip Router to acquire
- * the initial membership.
- *
- * @author Bela Ban
- */
-public class TCPPING extends Discovery  {
-    int             port_range=1;        // number of ports to be probed for initial membership
-
-    /** List of IpAddress */
-    ArrayList       initial_hosts=null;  // hosts to be contacted for the initial membership
-    final static String name="TCPPING";
-
-
-
-    @Override // GemStoneAddition
-    public String getName() {
-        return name;
-    }
-
-
-    @Override // GemStoneAddition
-    public boolean setProperties(Properties props) {
-        String str;
-
-        str=props.getProperty("port_range");           // if member cannot be contacted on base port,
-        if(str != null) {                              // how many times can we increment the port
-            port_range=Integer.parseInt(str);
-            if (port_range < 1) {
-               port_range = 1;    
-            }
-            props.remove("port_range");
-        }
-
-        str=props.getProperty("initial_hosts");
-        if(str != null) {
-            props.remove("initial_hosts");
-            initial_hosts=createInitialHosts(str);
-        }
-
-        return super.setProperties(props);
-    }
-
-
-    @Override // GemStoneAddition
-    public void localAddressSet(Address addr) {
-        // Add own address to initial_hosts if not present: we must always be able to ping ourself !
-        if(initial_hosts != null && addr != null) {
-            if(initial_hosts.contains(addr)) {
-                initial_hosts.remove(addr);
-                if(log.isDebugEnabled()) log.debug("[SET_LOCAL_ADDRESS]: removing my own address (" + addr +
-                                                   ") from initial_hosts; initial_hosts=" + initial_hosts);
-            }
-        }
-    }
-
-
-    @Override // GemStoneAddition
-    public void sendGetMembersRequest(AtomicBoolean waiter_sync) {
-        Message msg;
-
-        wakeWaiter(waiter_sync);
-
-        for(Iterator it=initial_hosts.iterator(); it.hasNext();) {
-            Address addr=(Address)it.next();
-            // if(tmpMbrs.contains(addr)) {
-               // ; // continue; // changed as suggested by Mark Kopec
-            // }
-            msg=new Message(addr, null, null);
-            msg.putHeader(name, new PingHeader(PingHeader.GET_MBRS_REQ, null));
-
-            if(trace) log.trace("[FIND_INITIAL_MBRS] sending PING request to " + msg.getDest());
-            passDown(new Event(Event.MSG, msg));
-        }
-    }
-
-
-
-    /* -------------------------- Private methods ---------------------------- */
-
-    /**
-     * Input is "daddy[8880],sindhu[8880],camille[5555]. Return List of IpAddresses
-     */
-    private ArrayList createInitialHosts(String l) {
-        StringTokenizer tok=new StringTokenizer(l, ",");
-        String          t;
-        IpAddress       addr;
-        ArrayList       retval=new ArrayList();
-
-        while(tok.hasMoreTokens()) {
-            try {
-                t=tok.nextToken();
-                String host=t.substring(0, t.indexOf('['));
-                int port=Integer.parseInt(t.substring(t.indexOf('[') + 1, t.indexOf(']')));
-                for(int i=port; i < port + port_range; i++) {
-                    addr=new IpAddress(host, i);
-                    retval.add(addr);
-                }
-            }
-            catch(NumberFormatException e) {
-                if(log.isErrorEnabled()) log.error(ExternalStrings.TCPPING_EXEPTION_IS__0, e);
-            }
-        }
-
-        return retval;
-    }
-
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCP_NIO.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCP_NIO.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCP_NIO.java
deleted file mode 100644
index 3c9ba7b..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCP_NIO.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/** Notice of modification as required by the LGPL
- *  This file was modified by Gemstone Systems Inc. on
- *  $Date$
- **/
-package com.gemstone.org.jgroups.protocols;
-
-import com.gemstone.org.jgroups.blocks.ConnectionTable;
-import com.gemstone.org.jgroups.blocks.ConnectionTableNIO;
-
-import java.net.InetAddress;
-import java.util.Properties;
-
-public class TCP_NIO extends TCP
-  {
-
-   /*
-   * (non-Javadoc)
-   *
-   * @see org.jgroups.protocols.TCP#getConnectionTable(long, long)
-   */
-  @Override // GemStoneAddition
-   protected ConnectionTable getConnectionTable(long ri, long cet,
-                                                InetAddress b_addr, InetAddress bc_addr, int s_port, int e_port) throws Exception {
-      ConnectionTableNIO ct = null;
-      if (ri == 0 && cet == 0) {
-         ct = new ConnectionTableNIO(this, b_addr, bc_addr, s_port, e_port );
-      } else {
-         if (ri == 0) {
-            ri = 5000;
-            if(warn) log.warn("reaper_interval was 0, set it to "
-                  + ri);
-         }
-         if (cet == 0) {
-            cet = 1000 * 60 * 5;
-            if(warn) log.warn("conn_expire_time was 0, set it to "
-                  + cet);
-         }
-         ct = new ConnectionTableNIO(this, b_addr, bc_addr, s_port, e_port, ri, cet);
-      }
-      return ct;
-   }
-
-  @Override // GemStoneAddition
-   public String getName() {
-        return "TCP_NIO";
-    }
-
-   public int getReaderThreads() { return m_reader_threads; }
-   public int getWriterThreads() { return m_writer_threads; }
-   public int getProcessorThreads() { return m_processor_threads; }
-   public int getProcessorMinThreads() { return m_processor_minThreads;}
-   public int getProcessorMaxThreads() { return m_processor_maxThreads;}
-   public int getProcessorQueueSize() { return m_processor_queueSize; }
-   public int getProcessorKeepAliveTime() { return m_processor_keepAliveTime; }
-
-   /** Setup the Protocol instance acording to the configuration string */
-   @Override // GemStoneAddition
-   public boolean setProperties(Properties props) {
-       String str;
-
-       str=props.getProperty("reader_threads");
-       if(str != null) {
-          m_reader_threads=Integer.parseInt(str);
-          props.remove("reader_threads");
-       }
-
-       str=props.getProperty("writer_threads");
-       if(str != null) {
-          m_writer_threads=Integer.parseInt(str);
-          props.remove("writer_threads");
-       }
-
-       str=props.getProperty("processor_threads");
-       if(str != null) {
-          m_processor_threads=Integer.parseInt(str);
-          props.remove("processor_threads");
-       }
-
-      str=props.getProperty("processor_minThreads");
-      if(str != null) {
-         m_processor_minThreads=Integer.parseInt(str);
-         props.remove("processor_minThreads");
-      }
-
-      str=props.getProperty("processor_maxThreads");
-      if(str != null) {
-         m_processor_maxThreads =Integer.parseInt(str);
-         props.remove("processor_maxThreads");
-      }
-
-      str=props.getProperty("processor_queueSize");
-      if(str != null) {
-         m_processor_queueSize=Integer.parseInt(str);
-         props.remove("processor_queueSize");
-      }
-
-      str=props.getProperty("processor_keepAliveTime");
-      if(str != null) {
-         m_processor_keepAliveTime=Integer.parseInt(str);
-         props.remove("processor_keepAliveTime");
-      }
-
-      return super.setProperties(props);
-   }
-
-   private int m_reader_threads = 8;
-
-   private int m_writer_threads = 8;
-
-   private int m_processor_threads = 10;                    // PooledExecutor.createThreads()
-   private int m_processor_minThreads = 10;                 // PooledExecutor.setMinimumPoolSize()
-   private int m_processor_maxThreads = 10;                 // PooledExecutor.setMaxThreads()
-   private int m_processor_queueSize=100;                   // Number of queued requests that can be pending waiting
-                                                            // for a background thread to run the request.
-   private int m_processor_keepAliveTime = -1;              // PooledExecutor.setKeepAliveTime( milliseconds);
-                                                            // A negative value means to wait forever
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TOTAL.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TOTAL.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TOTAL.java
deleted file mode 100644
index 2622a29..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TOTAL.java
+++ /dev/null
@@ -1,1055 +0,0 @@
-/** Notice of modification as required by the LGPL
- *  This file was modified by Gemstone Systems Inc. on
- *  $Date$
- **/
-// $Id: TOTAL.java,v 1.11 2005/08/08 12:45:44 belaban Exp $
-package com.gemstone.org.jgroups.protocols;
-
-
-import com.gemstone.org.jgroups.oswego.concurrent.ReadWriteLock;
-import com.gemstone.org.jgroups.oswego.concurrent.WriterPreferenceReadWriteLock;
-import com.gemstone.org.jgroups.Address;
-import com.gemstone.org.jgroups.Event;
-import com.gemstone.org.jgroups.Message;
-import com.gemstone.org.jgroups.View;
-import com.gemstone.org.jgroups.stack.AckSenderWindow;
-import com.gemstone.org.jgroups.stack.Protocol;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-import com.gemstone.org.jgroups.util.TimeScheduler;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.*;
-
-
-/**
- * Implements the total ordering layer using a message sequencer
- * <p/>
- * <p/>
- * The protocol guarantees that all bcast sent messages will be delivered in
- * the same order to all members. For that it uses a sequencer which assignes
- * monotonically increasing sequence ID to broadcasts. Then all group members
- * deliver the bcasts in ascending sequence ID order.
- * <p/>
- * <ul>
- * <li>
- * When a bcast message comes down to this layer, it is placed in the pending
- * down queue. A bcast request is sent to the sequencer.</li>
- * <li>
- * When the sequencer receives a bcast request, it creates a bcast reply
- * message and assigns to it a monotonically increasing seqID and sends it back
- * to the source of the bcast request.</li>
- * <li>
- * When a broadcast reply is received, the corresponding bcast message is
- * assigned the received seqID. Then it is broadcasted.</li>
- * <li>
- * Received bcasts are placed in the up queue. The queue is sorted according
- * to the seqID of the bcast. Any message at the head of the up queue with a
- * seqID equal to the next expected seqID is delivered to the layer above.</li>
- * <li>
- * Unicast messages coming from the layer below are forwarded above.</li>
- * <li>
- * Unicast messages coming from the layer above are forwarded below.</li>
- * </ul>
- * <p/>
- * <i>Please note that once a <code>BLOCK_OK</code> is acknowledged messages
- * coming from above are discarded!</i> Either the application must stop
- * sending messages when a <code>BLOCK</code> event is received from the
- * channel or a QUEUE layer should be placed above this one. Received messages
- * are still delivered above though.
- * <p/>
- * bcast requests are retransmitted periodically until a bcast reply is
- * received. In case a BCAST_REP is on its way during a BCAST_REQ
- * retransmission, then the next BCAST_REP will be to a non-existing
- * BCAST_REQ. So, a nulll BCAST message is sent to fill the created gap in
- * the seqID of all members.
- *
- * @author i.georgiadis@doc.ic.ac.uk
- */
-public class TOTAL extends Protocol  {
-    /**
-     * The header processed by the TOTAL layer and intended for TOTAL
-     * inter-stack communication
-     */
-    public static class Header extends com.gemstone.org.jgroups.Header  {
-        // Header types
-        /**
-         * Null value for the tag
-         */
-        public static final int NULL_TYPE=-1;
-        /**
-         * Request to broadcast by the source
-         */
-        public static final int REQ=0;
-        /**
-         * Reply to broadcast request.
-         */
-        public static final int REP=1;
-        /**
-         * Unicast message
-         */
-        public static final int UCAST=2;
-        /**
-         * Broadcast Message
-         */
-        public static final int BCAST=3;
-
-        /**
-         * The header's type tag
-         */
-        public int type;
-        /**
-         * The ID used by the message source to match replies from the
-         * sequencer
-         */
-        public long localSequenceID;
-        /**
-         * The ID imposing the total order of messages
-         */
-        public long sequenceID;
-
-        /**
-         * used for externalization
-         */
-        public Header() {
-        }
-
-        /**
-         * Create a header for the TOTAL layer
-         *
-         * @param type       the header's type
-         * @param localSeqID the ID used by the sender of broadcasts to match
-         *                   requests with replies from the sequencer
-         * @param seqID      the ID imposing the total order of messages
-         * @throws IllegalArgumentException if the provided header type is
-         *                                  unknown
-         */
-        public Header(int type, long localSeqID, long seqID) {
-            super();
-            switch(type) {
-            case REQ:
-            case REP:
-            case UCAST:
-            case BCAST:
-                this.type=type;
-                break;
-            default:
-                this.type=NULL_TYPE;
-                throw new IllegalArgumentException("type");
-            }
-            this.localSequenceID=localSeqID;
-            this.sequenceID=seqID;
-        }
-
-        /**
-         * For debugging purposes
-         */
-        @Override // GemStoneAddition  
-        public String toString() {
-            StringBuffer buffer=new StringBuffer();
-            String typeName;
-            buffer.append("[TOTAL.Header");
-            switch(type) {
-            case REQ:
-                typeName="REQ";
-                break;
-            case REP:
-                typeName="REP";
-                break;
-            case UCAST:
-                typeName="UCAST";
-                break;
-            case BCAST:
-                typeName="BCAST";
-                break;
-            case NULL_TYPE:
-                typeName="NULL_TYPE";
-                break;
-            default:
-                typeName="";
-                break;
-            }
-            buffer.append(", type=" + typeName);
-            buffer.append(", " + "localID=" + localSequenceID);
-            buffer.append(", " + "seqID=" + sequenceID);
-            buffer.append(']');
-
-            return (buffer.toString());
-        }
-
-        /**
-         * Manual serialization
-         */
-        public void writeExternal(ObjectOutput out) throws IOException {
-            out.writeInt(type);
-            out.writeLong(localSequenceID);
-            out.writeLong(sequenceID);
-        }
-
-        /**
-         * Manual deserialization
-         */
-        public void readExternal(ObjectInput in) throws IOException,
-                                                        ClassNotFoundException {
-            type=in.readInt();
-            localSequenceID=in.readLong();
-            sequenceID=in.readLong();
-        }
-    }
-
-
-    /**
-     * The retransmission listener - It is called by the
-     * <code>AckSenderWindow</code> when a retransmission should occur
-     */
-    private class Command implements AckSenderWindow.RetransmitCommand {
-        Command() {
-        }
-
-        public void retransmit(long seqNo, Message msg) {
-            _retransmitBcastRequest(seqNo);
-        }
-        // GemstoneAddition
-        public long getMaxRetransmissionBurst() {
-          return 0;
-        }
-    }
-
-
-    /**
-     * Protocol name
-     */
-    private static final String PROT_NAME="TOTAL";
-    /**
-     * Property names
-     */
-    private static final String TRACE_PROP="trace";
-
-    /**
-     * Average time between broadcast request retransmissions
-     */
-    private final long[] AVG_RETRANSMIT_INTERVAL=new long[]{1000, 2000, 3000, 4000};
-
-    /**
-     * Null value for the IDs
-     */
-    private static final long NULL_ID=-1;
-    // Layer sending states
-    /**
-     * No group has been joined yet
-     */
-    private static final int NULL_STATE=-1;
-    /**
-     * When set, all messages are sent/received
-     */
-    private static final int RUN=0;
-    /**
-     * When set, only session-specific messages are sent/received, i.e. only
-     * messages essential to the session's integrity
-     */
-    private static final int FLUSH=1;
-    /**
-     * No message is sent to the layer below
-     */
-    private static final int BLOCK=2;
-
-
-    /**
-     * The state lock allowing multiple reads or a single write
-     */
-    private final ReadWriteLock stateLock=new WriterPreferenceReadWriteLock();
-    /**
-     * Protocol layer message-sending state
-     */
-    private int state=NULL_STATE;
-    /**
-     * The address of this stack
-     */
-    private Address addr=null;
-    /**
-     * The address of the sequencer
-     */
-    private Address sequencerAddr=null;
-    /**
-     * The sequencer's seq ID. The ID of the most recently broadcast reply
-     * message
-     */
-    private long sequencerSeqID=NULL_ID;
-    /**
-     * The local sequence ID, i.e. the ID sent with the last broadcast request
-     * message. This is increased with every broadcast request sent to the
-     * sequencer and it's used to match the requests with the sequencer's
-     * replies
-     */
-    private long localSeqID=NULL_ID;
-    /**
-     * The total order sequence ID. This is the ID of the most recently
-     * delivered broadcast message. As the sequence IDs are increasing without
-     * gaps, this is used to detect missing broadcast messages
-     */
-    private long seqID=NULL_ID;
-    /**
-     * The list of unanswered broadcast requests to the sequencer. The entries
-     * are stored in increasing local sequence ID, i.e. in the order they were
-     * <p/>
-     * sent localSeqID -> Broadcast msg to be sent.
-     */
-    private SortedMap reqTbl;
-    /**
-     * The list of received broadcast messages that haven't yet been delivered
-     * to the layer above. The entries are stored in increasing sequence ID,
-     * i.e. in the order they must be delivered above
-     * <p/>
-     * seqID -> Received broadcast msg
-     */
-    private SortedMap upTbl;
-    /**
-     * Retranmitter for pending broadcast requests
-     */
-    private AckSenderWindow retransmitter;
-
-
-    /**
-     * Print addresses in host_ip:port form to bypass DNS
-     */
-    private String _addrToString(Object addr) {
-        return (
-                   addr == null ? "<null>" :
-                ((addr instanceof com.gemstone.org.jgroups.stack.IpAddress) ?
-                (((com.gemstone.org.jgroups.stack.IpAddress)addr).getIpAddress(
-                ).getHostAddress() + ':' +
-                ((com.gemstone.org.jgroups.stack.IpAddress)addr).getPort()) :
-                addr.toString())
-               );
-    }
-
-
-    /**
-     * @return this protocol's name
-     */
-    private String _getName() {
-        return (PROT_NAME);
-    }
-
-    /**
-     * Configure the protocol based on the given list of properties
-     *
-     * @param properties the list of properties to use to setup this layer
-     * @return false if there was any unrecognized property or a property with
-     *         an invalid value
-     */
-    private boolean _setProperties(Properties properties) {
-        String value;
-
-        // trace
-        // Parse & remove property but ignore it; use Trace.trace instead
-        value=properties.getProperty(TRACE_PROP);
-        if(value != null) properties.remove(TRACE_PROP);
-        if(properties.size() > 0) {
-            if(log.isErrorEnabled())
-                log.error("The following properties are not " +
-                          "recognized: " + properties.toString());
-            return (false);
-        }
-        return (true);
-    }
-
-    /**
-     * Events that some layer below must handle
-     *
-     * @return the set of <code>Event</code>s that must be handled by some layer
-     *         below
-     */
-    Vector _requiredDownServices() {
-        Vector services=new Vector();
-
-        return (services);
-    }
-
-    /**
-     * Events that some layer above must handle
-     *
-     * @return the set of <code>Event</code>s that must be handled by some
-     *         layer above
-     */
-    Vector _requiredUpServices() {
-        Vector services=new Vector();
-
-        return (services);
-    }
-
-
-    /**
-     * Extract as many messages as possible from the pending up queue and send
-     * them to the layer above
-     */
-    private void _deliverBcast() {
-        Message msg;
-        Header header;
-
-        synchronized(upTbl) {
-            while((msg=(Message)upTbl.remove(Long.valueOf(seqID + 1))) != null) {
-                header=(Header)msg.removeHeader(getName());
-                if(header.localSequenceID != NULL_ID) passUp(new Event(Event.MSG, msg));
-                ++seqID;
-            }
-        } // synchronized(upTbl)
-    }
-
-
-    /**
-     * Add all undelivered bcasts sent by this member in the req queue and then
-     * replay this queue
-     */
-    private void _replayBcast() {
-        Iterator it;
-        Message msg;
-        Header header;
-
-        // i. Remove all undelivered bcasts sent by this member and place them
-        // again in the pending bcast req queue
-
-        synchronized(upTbl) {
-            if(upTbl.size() > 0)
-                if(log.isInfoEnabled()) log.info(ExternalStrings.TOTAL_REPLAYING_UNDELIVERED_BCASTS);
-
-            it=upTbl.entrySet().iterator();
-            while(it.hasNext()) {
-                msg=(Message)((Map.Entry)it.next()).getValue();
-                it.remove();
-                if(!msg.getSrc().equals(addr)) {
-                    if(log.isInfoEnabled())
-                        log.info("During replay: " +
-                                 "discarding BCAST[" +
-                                 ((TOTAL.Header)msg.getHeader(getName())).sequenceID +
-                                 "] from " + _addrToString(msg.getSrc()));
-                    continue;
-                }
-                header=(Header)msg.removeHeader(getName());
-                if(header.localSequenceID == NULL_ID) continue;
-                _sendBcastRequest(msg, header.localSequenceID);
-            }
-        } // synchronized(upTbl)
-    }
-
-
-    /**
-     * Send a unicast message: Add a <code>UCAST</code> header
-     *
-     * @param msg the message to unicast
-     * @return the message to send
-     */
-    private Message _sendUcast(Message msg) {
-        msg.putHeader(getName(), new Header(Header.UCAST, NULL_ID, NULL_ID));
-        return (msg);
-    }
-
-
-    /**
-     * Replace the original message with a broadcast request sent to the
-     * sequencer. The original bcast message is stored locally until a reply to
-     * bcast is received from the sequencer. This function has the side-effect
-     * of increasing the <code>localSeqID</code>
-     *
-     * @param msg the message to broadcast
-     */
-    private void _sendBcastRequest(Message msg) {
-        _sendBcastRequest(msg, ++localSeqID);
-    }
-
-
-    /**
-     * Replace the original message with a broadcast request sent to the
-     * sequencer. The original bcast message is stored locally until a reply
-     * to bcast is received from the sequencer
-     *
-     * @param msg the message to broadcast
-     * @param id  the local sequence ID to use
-     */
-    private void _sendBcastRequest(Message msg, long id) {
-
-        // i. Store away the message while waiting for the sequencer's reply
-        // ii. Send a bcast request immediatelly and also schedule a
-        // retransmission
-        synchronized(reqTbl) {
-            reqTbl.put(Long.valueOf(id), msg);
-        }
-        _transmitBcastRequest(id);
-        retransmitter.add(id, msg);
-    }
-
-
-    /**
-     * Send the bcast request with the given localSeqID
-     *
-     * @param seqID the local sequence id of the
-     */
-    private void _transmitBcastRequest(long seqID) {
-        Message reqMsg;
-
-        // i. If NULL_STATE, then ignore, just transient state before
-        // shutting down the retransmission thread
-        // ii. If blocked, be patient - reschedule
-        // iii. If the request is not pending any more, acknowledge it
-        // iv. Create a broadcast request and send it to the sequencer
-
-        if(state == NULL_STATE) {
-            if(log.isInfoEnabled()) log.info(ExternalStrings.TOTAL_TRANSMIT_BCAST_REQ_0__IN_NULL_STATE, seqID);
-            return;
-        }
-        if(state == BLOCK) return;
-
-        synchronized(reqTbl) {
-            if(!reqTbl.containsKey(Long.valueOf(seqID))) {
-                retransmitter.ack(seqID);
-                return;
-            }
-        }
-        reqMsg=new Message(sequencerAddr, addr, new byte[0]);
-        reqMsg.putHeader(getName(), new Header(Header.REQ, seqID, NULL_ID));
-
-        passDown(new Event(Event.MSG, reqMsg));
-    }
-
-
-    /**
-     * Receive a unicast message: Remove the <code>UCAST</code> header
-     *
-     * @param msg the received unicast message
-     */
-    private void _recvUcast(Message msg) {
-        msg.removeHeader(getName());
-    }
-
-    /**
-     * Receive a broadcast message: Put it in the pending up queue and then
-     * try to deliver above as many messages as possible
-     *
-     * @param msg the received broadcast message
-     */
-    private void _recvBcast(Message msg) {
-        Header header=(Header)msg.getHeader(getName());
-
-        // i. Put the message in the up pending queue only if it's not
-        // already there, as it seems that the event may be received
-        // multiple times before a view change when all members are
-        // negotiating a common set of stable msgs
-        //
-        // ii. Deliver as many messages as possible
-
-        synchronized(upTbl) {
-            if(header.sequenceID <= seqID)
-                return;
-            upTbl.put(Long.valueOf(header.sequenceID), msg);
-        }
-
-        _deliverBcast();
-    }
-
-
-    /**
-     * Received a bcast request - Ignore if not the sequencer, else send a
-     * bcast reply
-     *
-     * @param msg the broadcast request message
-     */
-    private void _recvBcastRequest(Message msg) {
-        Header header;
-        Message repMsg;
-
-        // i. If blocked, discard the bcast request
-        // ii. Assign a seqID to the message and send it back to the requestor
-
-        if(!addr.equals(sequencerAddr)) {
-            if(log.isErrorEnabled())
-                log.error("Received bcast request " +
-                          "but not a sequencer");
-            return;
-        }
-        if(state == BLOCK) {
-            if(log.isInfoEnabled()) log.info(ExternalStrings.TOTAL_BLOCKED_DISCARD_BCAST_REQ);
-            return;
-        }
-        header=(Header)msg.getHeader(getName());
-        ++sequencerSeqID;
-        repMsg=new Message(msg.getSrc(), addr, new byte[0]);
-        repMsg.putHeader(getName(), new Header(Header.REP, header.localSequenceID,
-                                               sequencerSeqID));
-
-        passDown(new Event(Event.MSG, repMsg));
-    }
-
-
-    /**
-     * Received a bcast reply - Match with the pending bcast request and move
-     * the message in the list of messages to be delivered above
-     *
-     * @param header the header of the bcast reply
-     */
-    private void _recvBcastReply(Header header) {
-        Message msg;
-        long id;
-
-        // i. If blocked, discard the bcast reply
-        //
-        // ii. Assign the received seqID to the message and broadcast it
-        //
-        // iii.
-        // - Acknowledge the message to the retransmitter
-        // - If non-existent BCAST_REQ, send a fake bcast to avoid seqID gaps
-        // - If localID == NULL_ID, it's a null BCAST, else normal BCAST
-        // - Set the seq ID of the message to the one sent by the sequencer
-
-        if(state == BLOCK) {
-            if(log.isInfoEnabled()) log.info(ExternalStrings.TOTAL_BLOCKED_DISCARD_BCAST_REP);
-            return;
-        }
-
-        synchronized(reqTbl) {
-            msg=(Message)reqTbl.remove(Long.valueOf(header.localSequenceID));
-        }
-
-        if(msg != null) {
-            retransmitter.ack(header.localSequenceID);
-            id=header.localSequenceID;
-        }
-        else {
-            if(log.isInfoEnabled())
-                log.info("Bcast reply to " +
-                         "non-existent BCAST_REQ[" + header.localSequenceID +
-                         "], Sending NULL bcast");
-            id=NULL_ID;
-            msg=new Message(null, addr, new byte[0]);
-        }
-        msg.putHeader(getName(), new Header(Header.BCAST, id, header.sequenceID));
-
-        passDown(new Event(Event.MSG, msg));
-    }
-
-
-    /**
-     * Resend the bcast request with the given localSeqID
-     *
-     * @param seqID the local sequence id of the
-     */
-    protected/*GemStoneAddition*/ void _retransmitBcastRequest(long seqID) {
-        // *** Get a shared lock
-        try {
-            stateLock.readLock().acquire();
-            try {
-                if(log.isInfoEnabled()) log.info(ExternalStrings.TOTAL_RETRANSMIT_BCAST_REQ_0, seqID);
-                _transmitBcastRequest(seqID);
-            }
-            finally {
-                stateLock.readLock().release();
-            }
-        }
-        catch(InterruptedException e) {
-          Thread.currentThread().interrupt(); // GemStoneAddition
-            log.error(ExternalStrings.TOTAL_FAILED_ACQUIRING_A_READ_LOCK, e);
-        }
-    }
-
-
-    /* Up event handlers
-     * If the return value is true the event travels further up the stack
-     * else it won't be forwarded
-     */
-
-    /**
-     * Prepare for a VIEW_CHANGE: switch to flushing state
-     *
-     * @return true if the event is to be forwarded further up
-     */
-    private boolean _upBlock() {
-        // *** Get an exclusive lock
-        try {
-            stateLock.writeLock().acquire();
-            try {
-                state=FLUSH;
-                // *** Revoke the exclusive lock
-            }
-            finally {
-                stateLock.writeLock().release();
-            }
-        }
-        catch(InterruptedException e) {
-          Thread.currentThread().interrupt(); // GemStoneAddition
-            log.error(ExternalStrings.TOTAL_FAILED_ACQUIRING_THE_WRITE_LOCK, e);
-        }
-
-        return (true);
-    }
-
-
-    /**
-     * Handle an up MSG event
-     *
-     * @param event the MSG event
-     * @return true if the event is to be forwarded further up
-     */
-    private boolean _upMsg(Event event) {
-        Message msg;
-        Object obj;
-        Header header;
-
-        // *** Get a shared lock
-        try {
-            stateLock.readLock().acquire();
-            try {
-
-                // If NULL_STATE, shouldn't receive any msg on the up queue!
-                if(state == NULL_STATE) {
-                    if(log.isErrorEnabled()) log.error(ExternalStrings.TOTAL_UP_MSG_IN_NULL_STATE);
-                    return (false);
-                }
-
-                // Peek the header:
-                //
-                // (UCAST) A unicast message - Send up the stack
-                // (BCAST) A broadcast message - Handle specially
-                // (REQ) A broadcast request - Handle specially
-                // (REP) A broadcast reply from the sequencer - Handle specially
-                msg=(Message)event.getArg();
-                if(!((obj=msg.getHeader(getName())) instanceof TOTAL.Header)) {
-                    if(log.isErrorEnabled()) log.error(ExternalStrings.TOTAL_NO_TOTALHEADER_FOUND);
-                    return (false);
-                }
-                header=(Header)obj;
-
-                switch(header.type) {
-                case Header.UCAST:
-                    _recvUcast(msg);
-                    return (true);
-                case Header.BCAST:
-                    _recvBcast(msg);
-                    return (false);
-                case Header.REQ:
-                    _recvBcastRequest(msg);
-                    return (false);
-                case Header.REP:
-                    _recvBcastReply(header);
-                    return (false);
-                default:
-                    if(log.isErrorEnabled()) log.error(ExternalStrings.TOTAL_UNKNOWN_HEADER_TYPE);
-                    return (false);
-                }
-
-                // ** Revoke the shared lock
-            }
-            finally {
-                stateLock.readLock().release();
-            }
-        }
-        catch(InterruptedException e) {
-          Thread.currentThread().interrupt(); // GemStoneAddition
-            if(log.isErrorEnabled()) log.error(e.getMessage());
-        }
-
-        return (true);
-    }
-
-
-    /**
-     * Set the address of this group member
-     *
-     * @param event the SET_LOCAL_ADDRESS event
-     * @return true if event should be forwarded further up
-     */
-    private boolean _upSetLocalAddress(Event event) {
-        // *** Get an exclusive lock
-        try {
-            stateLock.writeLock().acquire();
-            try {
-                addr=(Address)event.getArg();
-            }
-            finally {
-                stateLock.writeLock().release();
-            }
-        }
-        catch(InterruptedException e) {
-          Thread.currentThread().interrupt();
-            log.error(e.getMessage());
-        }
-        return (true);
-    }
-
-
-    /**
-     * Handle view changes
-     * <p/>
-     * param event the VIEW_CHANGE event
-     *
-     * @return true if the event should be forwarded to the layer above
-     */
-    private boolean _upViewChange(Event event) {
-        Object oldSequencerAddr;
-
-        // *** Get an exclusive lock
-        try {
-            stateLock.writeLock().acquire();
-            try {
-
-                state=RUN;
-
-                // i. See if this member is the sequencer
-                // ii. If this is the sequencer, reset the sequencer's sequence ID
-                // iii. Reset the last received sequence ID
-                //
-                // iv. Replay undelivered bcasts: Put all the undelivered bcasts
-                // sent by us back to the req queue and discard the rest
-                oldSequencerAddr=sequencerAddr;
-                sequencerAddr=
-                        (Address)((View)event.getArg()).getMembers().elementAt(0);
-                if(addr.equals(sequencerAddr)) {
-                    sequencerSeqID=NULL_ID;
-                    if((oldSequencerAddr == null) ||
-                            (!addr.equals(oldSequencerAddr)))
-                        if(log.isInfoEnabled()) log.info(ExternalStrings.TOTAL_IM_THE_NEW_SEQUENCER);
-                }
-                seqID=NULL_ID;
-                _replayBcast();
-
-                // *** Revoke the exclusive lock
-            }
-            finally {
-                stateLock.writeLock().release();
-            }
-        }
-        catch(InterruptedException e) {
-          Thread.currentThread().interrupt(); // GemStoneAddition
-            log.error(e.getMessage());
-        }
-
-        return (true);
-    }
-
-
-    /*
-     * Down event handlers
-     * If the return value is true the event travels further down the stack
-     * else it won't be forwarded
-     */
-
-
-    /**
-     * Blocking confirmed - No messages should come from above until a
-     * VIEW_CHANGE event is received. Switch to blocking state.
-     *
-     * @return true if event should travel further down
-     */
-    private boolean _downBlockOk() {
-        // *** Get an exclusive lock
-        try {
-            stateLock.writeLock().acquire();
-            try {
-                state=BLOCK;
-            }
-            finally {
-                stateLock.writeLock().release();
-            }
-        }
-        catch(InterruptedException e) {
-          Thread.currentThread().interrupt(); // GemStoneAddition
-            log.error(e.getMessage());
-        }
-
-        return (true);
-    }
-
-
-    /**
-     * A MSG event travelling down the stack. Forward unicast messages, treat
-     * specially the broadcast messages.<br>
-     * <p/>
-     * If in <code>BLOCK</code> state, i.e. it has replied to a
-     * <code>BLOCk_OK</code> and hasn't yet received a
-     * <code>VIEW_CHANGE</code> event, messages are discarded<br>
-     * <p/>
-     * If in <code>FLUSH</code> state, forward unicast but queue broadcasts
-     *
-     * @param event the MSG event
-     * @return true if event should travel further down
-     */
-    private boolean _downMsg(Event event) {
-        Message msg;
-
-        // *** Get a shared lock
-        try {
-            stateLock.readLock().acquire();
-            try {
-
-                // i. Discard all msgs, if in NULL_STATE
-                // ii. Discard all msgs, if blocked
-                if(state == NULL_STATE) {
-                    if(log.isErrorEnabled()) log.error(ExternalStrings.TOTAL_DISCARD_MSG_IN_NULL_STATE);
-                    return (false);
-                }
-                if(state == BLOCK) {
-                    if(log.isErrorEnabled()) log.error(ExternalStrings.TOTAL_BLOCKED_DISCARD_MSG);
-                    return (false);
-                }
-
-                msg=(Message)event.getArg();
-                if(msg.getDest() == null) {
-                    _sendBcastRequest(msg);
-                    return (false);
-                }
-                else {
-                    msg=_sendUcast(msg);
-                    event.setArg(msg);
-                }
-
-                // ** Revoke the shared lock
-            }
-            finally {
-                stateLock.readLock().release();
-            }
-        }
-        catch(InterruptedException e) {
-          Thread.currentThread().interrupt(); // GemStoneAddition
-            log.error(e.getMessage());
-        }
-
-        return (true);
-    }
-
-
-    /**
-     * Prepare this layer to receive messages from above
-     */
-    @Override // GemStoneAddition  
-    public void start() throws Exception {
-        TimeScheduler timer;
-
-        timer=stack != null ? stack.timer : null;
-        if(timer == null)
-            throw new Exception("TOTAL.start(): timer is null");
-
-        reqTbl=new TreeMap();
-        upTbl=new TreeMap();
-        retransmitter=new AckSenderWindow(new Command(), AVG_RETRANSMIT_INTERVAL);
-    }
-
-
-    /**
-     * Handle the stop() method travelling down the stack.
-     * <p/>
-     * The local addr is set to null, since after a Start->Stop->Start
-     * sequence this member's addr is not guaranteed to be the same
-     */
-    @Override // GemStoneAddition  
-    public void stop() {
-        try {
-            stateLock.writeLock().acquire();
-            try {
-                state=NULL_STATE;
-                retransmitter.reset();
-                reqTbl.clear();
-                upTbl.clear();
-                addr=null;
-            }
-            finally {
-                stateLock.writeLock().release();
-            }
-        }
-        catch(InterruptedException e) {
-          Thread.currentThread().interrupt(); // GemStoneAddition
-            log.error(e.getMessage());
-        }
-    }
-
-
-    /**
-     * Process an event coming from the layer below
-     *
-     * @param event the event to process
-     */
-    private void _up(Event event) {
-        switch(event.getType()) {
-        case Event.BLOCK:
-            if(!_upBlock()) return;
-            break;
-        case Event.MSG:
-            if(!_upMsg(event)) return;
-            break;
-        case Event.SET_LOCAL_ADDRESS:
-            if(!_upSetLocalAddress(event)) return;
-            break;
-        case Event.VIEW_CHANGE:
-            if(!_upViewChange(event)) return;
-            break;
-        default:
-            break;
-        }
-
-        passUp(event);
-    }
-
-
-    /**
-     * Process an event coming from the layer above
-     *
-     * @param event the event to process
-     */
-    private void _down(Event event) {
-        switch(event.getType()) {
-        case Event.BLOCK_OK:
-            if(!_downBlockOk()) return;
-            break;
-        case Event.MSG:
-            if(!_downMsg(event)) return;
-            break;
-        default:
-            break;
-        }
-
-        passDown(event);
-    }
-
-
-    /**
-     * Create the TOTAL layer
-     */
-    public TOTAL() {
-    }
-
-
-    // Methods deriving from <code>Protocol</code>
-    // javadoc inherited from superclass
-    @Override // GemStoneAddition  
-    public String getName() {
-        return (_getName());
-    }
-
-    // javadoc inherited from superclass
-    @Override // GemStoneAddition  
-    public boolean setProperties(Properties properties) {
-        return (_setProperties(properties));
-    }
-
-    // javadoc inherited from superclass
-    @Override // GemStoneAddition  
-    public Vector requiredDownServices() {
-        return (_requiredDownServices());
-    }
-
-    // javadoc inherited from superclass
-    @Override // GemStoneAddition  
-    public Vector requiredUpServices() {
-        return (_requiredUpServices());
-    }
-
-    // javadoc inherited from superclass
-    @Override // GemStoneAddition  
-    public void up(Event event) {
-        _up(event);
-    }
-
-    // javadoc inherited from superclass
-    @Override // GemStoneAddition  
-    public void down(Event event) {
-        _down(event);
-    }
-}


Mime
View raw message