Return-Path: X-Original-To: apmail-geode-commits-archive@minotaur.apache.org Delivered-To: apmail-geode-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3E6DD18533 for ; Fri, 21 Aug 2015 21:22:42 +0000 (UTC) Received: (qmail 11036 invoked by uid 500); 21 Aug 2015 21:22:42 -0000 Delivered-To: apmail-geode-commits-archive@geode.apache.org Received: (qmail 11003 invoked by uid 500); 21 Aug 2015 21:22:42 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 10994 invoked by uid 99); 21 Aug 2015 21:22:41 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Aug 2015 21:22:41 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 6F6A11AA97F for ; Fri, 21 Aug 2015 21:22:41 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.774 X-Spam-Level: * X-Spam-Status: No, score=1.774 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.006] autolearn=disabled Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id MsL3eL3SVRNS for ; Fri, 21 Aug 2015 21:22:28 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with SMTP id 5693F24B15 for ; Fri, 21 Aug 2015 21:22:26 +0000 (UTC) Received: (qmail 9368 invoked by uid 99); 21 Aug 2015 21:22:25 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Aug 2015 21:22:25 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8162DE1782; Fri, 21 Aug 2015 21:22:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: bschuchardt@apache.org To: commits@geode.incubator.apache.org Date: Fri, 21 Aug 2015 21:22:48 -0000 Message-Id: <6cc2af1ae001474586eb499d4819cc87@git.apache.org> In-Reply-To: <2436fee0ae564f7ea484456c1907c152@git.apache.org> References: <2436fee0ae564f7ea484456c1907c152@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [24/51] [partial] incubator-geode git commit: GEODE-77 removing the old jgroups subproject http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/GemFireTimeSync.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/GemFireTimeSync.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/GemFireTimeSync.java deleted file mode 100644 index e2f033c..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/GemFireTimeSync.java +++ /dev/null @@ -1,720 +0,0 @@ -/** - * An implementation of the Berkeley Algorithm for clock synchronization. - * On view changes and otherwise at a configurable interval this protocol - * sends a request for the current millisecond clock to all members. It - * computes the average round-trip time and the average clock value, throwing - * out samples outside the standard deviation. The result is then used to - * compute and send clock offsets to each member. - * - * @author Bruce Schuchardt - */ -package com.gemstone.org.jgroups.protocols; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicLong; - - - - - -import com.gemstone.org.jgroups.Address; -import com.gemstone.org.jgroups.Event; -import com.gemstone.org.jgroups.Header; -import com.gemstone.org.jgroups.JChannel; -import com.gemstone.org.jgroups.JGroupsVersion; -import com.gemstone.org.jgroups.Message; -import com.gemstone.org.jgroups.View; -import com.gemstone.org.jgroups.protocols.pbcast.GMS; -import com.gemstone.org.jgroups.protocols.pbcast.GMS.GmsHeader; -import com.gemstone.org.jgroups.spi.GFPeerAdapter; -import com.gemstone.org.jgroups.stack.IpAddress; -import com.gemstone.org.jgroups.stack.Protocol; -import com.gemstone.org.jgroups.util.GemFireTracer; -import com.gemstone.org.jgroups.util.ExternalStrings; -import com.gemstone.org.jgroups.util.Streamable; - -public class GemFireTimeSync extends Protocol { - - static private boolean DEBUG = Boolean.getBoolean("gemfire.time-service.debug"); - - private int clockSyncInterval = 100; // every 100 seconds (clock drift of 1 ms per second) - private int replyWaitInterval = 15; // fifteen second wait before timing out - private long currCoordOffset = 0; - - private Address localAddress = null; - private volatile View view; - private final AtomicLong nextProcId = new AtomicLong(0); - private final ConcurrentMap processors = new ConcurrentHashMap(); - private final ConcurrentMap joinTimeOffsets = new ConcurrentHashMap(); - - private ServiceThread syncThread; - // bug #50833 - use a different object to sync on creation of the syncThread - private final Object syncThreadLock = new Object(); - - // Test hook for unit testing. - private TestHook testHook; - - private long joinReqTime = 0; - public static final int TIME_RESPONSES = 0; - public static final int OFFSET_RESPONSE = 1; - - @Override - public String getName() { - return "GemFireTimeSync"; - } - - /** - * This method holds most of the logic for this protocol. The rest of the - * class is composed of utility calculation methods and messaging infrastructure. - * Here we request the current clock from each member, wait for the replies, - * calculate the distributed time using the Berkeley Algorithm and then send - * individual time offsets to each member including this JVM. - */ - synchronized void computeAndSendOffsets(View v) { - - if (v.getMembers().size() < 2) { - return; - } - // send a message containing a reply processor ID and the current time. - // Others will respond with their own time - long currentTime = System.currentTimeMillis(); - long procID = nextProcId.incrementAndGet(); - GFTimeSyncHeader timeHeader = new GFTimeSyncHeader(procID, GFTimeSyncHeader.OP_TIME_REQUEST, currentTime); - ReplyProcessor proc = new ReplyProcessor(view, procID); - processors.put(procID, proc); - - // specify use of UNICAST by setting the message destination. By doing - // this we can take advantage of the isHighPriority flag for OOB messaging - // and avoid cache traffic that might be clogging up the regular send/receive windows - try { - for (Iterator it = v.getMembers().iterator(); it.hasNext();) { - Address mbr = (Address)it.next(); - if (!mbr.equals(this.localAddress)) { - // JGroups requires a different message for each destination but the - // header can be reused - Message timeMessage = new Message(); - timeMessage.setDest(mbr); - timeMessage.isHighPriority = true; - timeMessage.putHeader(getName(), timeHeader); - passDown(new Event(Event.MSG, timeMessage)); - } - } - GFTimeSyncHeader myResponse = new GFTimeSyncHeader(0, (byte)0, currentTime); - proc.replyReceived(this.localAddress, myResponse); - proc.waitForReplies(replyWaitInterval * 1000); - } catch (InterruptedException e) { - return; - } finally { - if (testHook != null) { - testHook.setResponses(proc.responses, currentTime); - testHook.hook(TIME_RESPONSES); - } - processors.remove(procID); - } - - Map responses = proc.responses; - int numResponses = responses.size(); - if (DEBUG || log.getLogWriter().fineEnabled()) { - log.getLogWriter().info(ExternalStrings.DEBUG, "Received " + numResponses + " responses"); - } - - if (numResponses > 1) { - - // now compute the average round-trip time and the average clock time, - // throwing out values outside of the standard deviation for each - - long averageRTT = getMeanRTT(responses, 0, Long.MAX_VALUE); - long rTTStddev = getRTTStdDev(responses, averageRTT); - // now recompute the average throwing out ones that are way off - long newAverageRTT = getMeanRTT(responses, averageRTT, rTTStddev); - if (newAverageRTT > 0) { - averageRTT = newAverageRTT; - } - - long averageTime = getMeanClock(responses, 0, Long.MAX_VALUE); - long stddev = getClockStdDev(responses, averageTime); - long newAverageTime = getMeanClock(responses, averageTime, stddev); - if (newAverageTime > 0) { - averageTime = newAverageTime; - } - - long averageTransmitTime = averageRTT / 2; - long adjustedAverageTime = averageTime + averageTransmitTime; - - if (DEBUG || log.getLogWriter().fineEnabled()) { - StringBuilder buffer = new StringBuilder(5000); - for (Map.Entry entry: responses.entrySet()) { - buffer.append("\n\t").append(entry.getKey()).append(": ").append(entry.getValue()); - } - log.getLogWriter().info(ExternalStrings.DEBUG, "GemFire time service computed " - + "round trip time of " + averageRTT + " with stddev of " + rTTStddev + " and " - + "clock time of " + averageTime + " with stddev of " + stddev - + " for " + numResponses + " members. Details: \n\tstart time=" - + currentTime + " group time=" + adjustedAverageTime + " transmit time=" + averageTransmitTime - + buffer.toString()); - } - - // TODO: should all members on the same machine get the same time offset? - - for (Iterator> it = responses.entrySet().iterator(); it.hasNext(); ) { - Map.Entry entry = it.next(); - IpAddress mbr = (IpAddress)entry.getKey(); - GFTimeSyncHeader response = entry.getValue(); - Message offsetMessage = new Message(); - offsetMessage.setDest(mbr); - offsetMessage.isHighPriority = true; - - long responseTransmitTime = (response.timeReceived - currentTime) / 2; - long offset = adjustedAverageTime - (response.time + responseTransmitTime); - - if (DEBUG || log.getLogWriter().fineEnabled()) { - log.getLogWriter().info(ExternalStrings.DEBUG, "sending time offset of " + offset + " to " + entry.getKey() - + " whose time was " + response.time + " and transmit time was " + responseTransmitTime); - } - - offsetMessage.putHeader(getName(), new GFTimeSyncHeader(0, GFTimeSyncHeader.OP_TIME_OFFSET, offset)); - if (mbr == this.localAddress) { - // We need to cache offset here too just for co-ordinator. - currCoordOffset = offset; - offsetMessage.setSrc(this.localAddress); - up(new Event(Event.MSG, offsetMessage)); - } else { - passDown(new Event(Event.MSG, offsetMessage)); - } - } - } - } - - - @Override - public void up(Event event) { - switch (event.getType()) { - case Event.SET_LOCAL_ADDRESS: - this.localAddress = (Address)event.getArg(); - if (DEBUG || log.getLogWriter().fineEnabled()) { - log.getLogWriter().info(ExternalStrings.DEBUG, "GF time service setting local address to " + this.localAddress); - } - break; - case Event.MSG: - Message msg = (Message)event.getArg(); - GFTimeSyncHeader header = (GFTimeSyncHeader)msg.removeHeader(getName()); - if (header != null) { - switch (header.opType){ - case GFTimeSyncHeader.JOIN_TIME_REQUEST: - long beforeJoinTime = System.currentTimeMillis() + currCoordOffset; - if (DEBUG || log.getLogWriter().fineEnabled()) { - log.getLogWriter().info(ExternalStrings.DEBUG, "GemFire time service received join time offset request from " + msg.getSrc() + " join time=" + beforeJoinTime); - } - - // Store in a map and wait for JOIN reply GMS message. - GFTimeSyncHeader respHeader = new GFTimeSyncHeader(0, GFTimeSyncHeader.JOIN_RESPONSE_OFFSET, currCoordOffset, beforeJoinTime, 0); - joinTimeOffsets.put(msg.getSrc(), respHeader); - break; - case GFTimeSyncHeader.JOIN_RESPONSE_OFFSET: - if (header.coordTimeAfterJoin == 0) { // member sending offset is using old version - ignore it - break; - } - long currentLocalTime = System.currentTimeMillis(); - - if (DEBUG || log.getLogWriter().fineEnabled()) { - log.getLogWriter().info( - ExternalStrings.DEBUG, - " currentLocalTime =" + currentLocalTime - + " coordAfterJoinTime = " + header.coordTimeAfterJoin - + " coordBeforeJoinTime = " + header.coordTimeBeforeJoin - + " joinReqTime = " + joinReqTime); - } - long transmissionTime = ((currentLocalTime - (header.coordTimeAfterJoin - header.coordTimeBeforeJoin)) - joinReqTime)/2; - long timeOffs = header.coordTimeBeforeJoin - (joinReqTime + transmissionTime); - - if (DEBUG || log.getLogWriter().fineEnabled()) { - log.getLogWriter().info(ExternalStrings.DEBUG, "GemFire time service received join time offset from " + msg.getSrc() + " offset=" + timeOffs); - } - GFPeerAdapter mgr = stack.gfPeerFunctions; - if (mgr != null) { - // give the time to the manager who can install it in gemfire - mgr.setCacheTimeOffset(msg.getSrc(), timeOffs, true); - } - break; - case GFTimeSyncHeader.OP_TIME_REQUEST: - if (DEBUG || log.getLogWriter().fineEnabled()) { - log.getLogWriter().info(ExternalStrings.DEBUG, "GemFire time service received time request from " + msg.getSrc()); - } - GFTimeSyncHeader responseHeader = new GFTimeSyncHeader(header.procID, GFTimeSyncHeader.OP_TIME_RESPONSE, System.currentTimeMillis()); - Message response = new Message(); - response.setDest(msg.getSrc()); - response.putHeader(getName(), responseHeader); - response.isHighPriority = true; - passDown(new Event(Event.MSG, response)); - return; - case GFTimeSyncHeader.OP_TIME_RESPONSE: - if (DEBUG || log.getLogWriter().fineEnabled()) { - log.getLogWriter().info(ExternalStrings.DEBUG, "GemFire time service received time response from " + msg.getSrc()); - } - ReplyProcessor p = processors.get(new Long(header.procID)); - if (p != null) { - p.replyReceived(msg.getSrc(), header); - } - return; - case GFTimeSyncHeader.OP_TIME_OFFSET: - long timeOffset = header.time; - if (DEBUG || log.getLogWriter().fineEnabled()) { - log.getLogWriter().info(ExternalStrings.DEBUG, "GemFire time service received time offset update from " + msg.getSrc() + " offset=" + timeOffset); - } - GFPeerAdapter jmm = stack.gfPeerFunctions; - if (jmm != null) { - // give the time offset to the Distribution manager who can set it in GemfireCacheImpl. - jmm.setCacheTimeOffset(msg.getSrc(), timeOffset, false); - } - if (testHook != null) { - testHook.hook(OFFSET_RESPONSE); - } - return; - } - } - } - passUp(event); - } - - /*private long getCurrTimeOffset() { - JGroupMembershipManager mgr = stack.jgmm; - if (mgr != null) { - // give the time offset to the Distribution manager who can set it in GemfireCacheImpl. - DistributedMembershipListener listener = mgr.getListener(); - if (listener != null) { - DistributionManager dm = listener.getDM(); - return dm .getCacheTimeOffset(); - } - } - return 0; - }*/ - - @Override - public void down(Event event) { - switch (event.getType()) { - case Event.VIEW_CHANGE: - View view = (View)event.getArg(); - if (DEBUG || log.getLogWriter().fineEnabled()) { - log.getLogWriter().info(ExternalStrings.DEBUG, "GF time service is processing view " + view); - } - viewChanged(view); - break; - case Event.MSG: - Message msg = (Message)event.getArg(); - GMS.GmsHeader header = (GmsHeader) msg.getHeader(GMS.name); - - if (header != null) { - if (header.getType() == GmsHeader.JOIN_REQ) { - // Send Time Sync OFFSET request header in JOIN_REQ message. - joinReqTime = System.currentTimeMillis(); - GFTimeSyncHeader timeHeader = new GFTimeSyncHeader(0, GFTimeSyncHeader.JOIN_TIME_REQUEST, joinReqTime); - msg.putHeader(getName(), timeHeader); - } else if (header.getType() == GmsHeader.JOIN_RSP) { - // Send the time offset in JOIN_RSP response message. - GFTimeSyncHeader joinTimeSyncHeader = joinTimeOffsets.remove(msg.getDest()); - if (joinTimeSyncHeader != null) { - long afterJoinTime = System.currentTimeMillis() + currCoordOffset; - joinTimeSyncHeader.coordTimeAfterJoin = afterJoinTime; - msg.putHeader(getName(), joinTimeSyncHeader); - - if (DEBUG || log.getLogWriter().fineEnabled()) { - log.getLogWriter().info(ExternalStrings.DEBUG, "GemFire time service is including after-join time in join-response to " + msg.getDest() + " after join time=" + afterJoinTime); - } - } - } - } - break; - } - passDown(event); - } - - private void viewChanged(View newView) { - this.view = (View) newView.clone(); - if (this.localAddress.equals(newView.getCoordinator())) { - boolean newThread = false; - synchronized(this.syncThreadLock) { - if (this.syncThread == null) { - this.syncThread = new ServiceThread(GemFireTracer.GROUP, "GemFire Time Service"); - this.syncThread.setDaemon(true); - this.syncThread.start(); - newThread = true; - } - if (!newThread) { - this.syncThread.computeOffsetsForNewView(); - } - } - } else { - synchronized (this.syncThreadLock) { - if (this.syncThread != null) { - this.syncThread.cancel(); - } - } - } - } - - /** - * retrieves the average of the samples. This can be used with (samples, 0, Long.MAX_VALUE) to get - * the initial mean and then (samples, lastResult, stddev) to get those within the standard deviation. - * @param values - * @param previousMean - * @param stddev - * @return the mean - */ - private long getMeanRTT(Map values, long previousMean, long stddev) { - long totalTime = 0; - long numSamples = 0; - long upperLimit = previousMean + stddev; - for (GFTimeSyncHeader response: values.values()) { - long rtt = response.timeReceived - response.time; - if (rtt <= upperLimit) { - numSamples++; - totalTime += rtt; - } - } - long averageTime = totalTime / numSamples; - return averageTime; - } - - private long getRTTStdDev(Map values, long average) { - long sqDiffs = 0; - for (GFTimeSyncHeader response: values.values()) { - long diff = average - (response.timeReceived - response.time); - sqDiffs += diff * diff; - } - return Math.round(Math.sqrt((double)sqDiffs)); - } - - /** - * retrieves the average of the samples. This can be used with (samples, 0, Long.MAX_VALUE) to get - * the initial mean and then (samples, lastResult, stddev) to get those within the standard deviation. - * @param values - * @param previousMean - * @param stddev - * @return the mean - */ - private long getMeanClock(Map values, long previousMean, long stddev) { - long totalTime = 0; - long numSamples = 0; - long upperLimit = previousMean + stddev; - long lowerLimit = previousMean - stddev; - for (GFTimeSyncHeader response: values.values()) { - if (lowerLimit <= response.time && response.time <= upperLimit) { - numSamples++; - totalTime += response.time; - } - } - long averageTime = totalTime / numSamples; - return averageTime; - } - - private long getClockStdDev(Map values, long average) { - long sqDiffs = 0; - for (GFTimeSyncHeader response: values.values()) { - long diff = average - response.time; - sqDiffs += diff * diff; - } - return Math.round(Math.sqrt((double)sqDiffs)); - } - - @Override - public boolean setProperties(Properties props) { - String str; - - super.setProperties(props); - - str=props.getProperty("clock_sync_interval"); - if (str != null) { - clockSyncInterval = Integer.parseInt(str); - props.remove("clock_sync_interval"); - } - - str=props.getProperty("reply_wait_interval"); - if (str != null) { - replyWaitInterval = Integer.parseInt(str); - props.remove("reply_wait_interval"); - } - - if (props.size() > 0) { - // this will not normally be seen by customers, even if there are unrecognized properties, because - // jgroups error messages aren't displayed unless the debug flag is turned on - log.error(ExternalStrings.DEBUG, "The following GemFireTimeSync properties were not recognized: " + props); - return false; - } - return true; - } - - @Override - public void init() throws Exception { - super.init(); - } - - @Override - public void start() throws Exception { - super.start(); - } - - @Override - public void stop() { - super.stop(); - if (this.syncThread != null) { - this.syncThread.cancel(); - } - } - - static class ReplyProcessor { - int responderCount; - long procID; - Map responses = new HashMap(); - Object doneSync = new Object(); - - ReplyProcessor(View view, long procID) { - responderCount = view.getMembers().size(); - this.procID = procID; - } - - void replyReceived(Address sender, GFTimeSyncHeader response) { - response.timeReceived = System.currentTimeMillis(); - synchronized(responses) { - responses.put(sender, response); - } - synchronized(doneSync) { - if (responses.size() >= responderCount) { - doneSync.notify(); - } - } - } - - boolean done() { - synchronized(responses) { - return responses.size() >= responderCount; - } - } - - void waitForReplies(long timeout) throws InterruptedException { - synchronized(doneSync) { - long endTime = System.currentTimeMillis() + timeout; - while (!done()) { - // compute remaining time in case of spurious wake-up - long remainingTime = endTime - System.currentTimeMillis(); - if (remainingTime <= 0) { - return; - } - doneSync.wait(remainingTime); - } - } - } - } - - public static class GFTimeSyncHeader extends Header implements Streamable { - static final byte OP_TIME_REQUEST = 0; - static final byte OP_TIME_RESPONSE = 1; - static final byte OP_TIME_OFFSET = 2; - static final byte JOIN_TIME_REQUEST = 3; - static final byte JOIN_RESPONSE_OFFSET = 4; - - public long procID; - public byte opType; - public long time; - public long coordTimeBeforeJoin; - public long coordTimeAfterJoin; - public transient long timeReceived; // set by ReplyProcessor when a response is received - - public GFTimeSyncHeader() {} - - public GFTimeSyncHeader(long procID, byte opType, long time) { - super(); - this.procID = procID; - this.opType = opType; - this.time = time; - } - - GFTimeSyncHeader(long procID, byte opType, long time, long beforeTime, long afterTime) { - super(); - this.procID = procID; - this.opType = opType; - this.time = time; - this.coordTimeBeforeJoin = beforeTime; - this.coordTimeAfterJoin = afterTime; - } - - @Override - public long size(short version) { - return super.size(version) + 17; - } - - @Override - public void writeExternal(ObjectOutput out) throws IOException { - out.writeLong(this.procID); - out.writeLong(this.time); - out.write(this.opType); - if (JChannel.getGfFunctions().isVersionForStreamAtLeast(out, JGroupsVersion.GFE_80_ORDINAL)) { - out.writeLong(this.coordTimeBeforeJoin); - out.writeLong(this.coordTimeAfterJoin); - } - } - - @Override - public void readExternal(ObjectInput in) throws IOException, - ClassNotFoundException { - this.procID = in.readLong(); - this.time = in.readLong(); - this.opType = in.readByte(); - if (JChannel.getGfFunctions().isVersionForStreamAtLeast(in, JGroupsVersion.GFE_80_ORDINAL)) { - this.coordTimeBeforeJoin = in.readLong(); - this.coordTimeAfterJoin = in.readLong(); - } - } - - @Override - public void writeTo(DataOutputStream out) throws IOException { - out.writeLong(this.procID); - out.writeLong(this.time); - out.write(this.opType); - if (JChannel.getGfFunctions().isVersionForStreamAtLeast(out, JGroupsVersion.GFE_80_ORDINAL)) { - out.writeLong(this.coordTimeBeforeJoin); - out.writeLong(this.coordTimeAfterJoin); - } - } - - @Override - public void readFrom(DataInputStream in) throws IOException, - IllegalAccessException, InstantiationException { - this.procID = in.readLong(); - this.time = in.readLong(); - this.opType = in.readByte(); - if (JChannel.getGfFunctions().isVersionForStreamAtLeast(in, JGroupsVersion.GFE_80_ORDINAL)) { - this.coordTimeBeforeJoin = in.readLong(); - this.coordTimeAfterJoin = in.readLong(); - } - } - - @Override - public String toString() { - return "SyncMessage(procID=" + this.procID + "; op=" - + op2String(this.opType) + "; time=" + this.time + "; timeRcvd=" - + this.timeReceived + "; coordTimeBeforeJoin=" - + this.coordTimeBeforeJoin + "; coordTimeAfterJoin=" - + this.coordTimeAfterJoin + ")"; - } - - private String op2String(byte op) { - switch (op) { - case OP_TIME_REQUEST: return "REQUEST"; - case OP_TIME_RESPONSE: return "RESPONSE"; - case OP_TIME_OFFSET: return "OFFSET"; - case JOIN_TIME_REQUEST: return "JOIN_TIME_REQUEST"; - case JOIN_RESPONSE_OFFSET: return "JOIN_OFFSET"; - } - return "??"; - } - } - - private class ServiceThread extends Thread { - private boolean cancelled; - private boolean waiting; // true if waiting for next scheduled time - private boolean skipWait; // true if the thread should begin processing as soon as it finishes current view - private Object lock = new Object(); - - ServiceThread(ThreadGroup g, String name) { - super(g,name); - } - - @Override - public void run() { - synchronized(this.lock) { - this.cancelled = false; - } - while (!cancelled()) { - View v = view; - if (v != null && v.getCreator().equals(localAddress)) { - computeAndSendOffsets(v); - } - try { - synchronized(this.lock) { - if (this.skipWait) { - this.skipWait = false; - } else { - this.waiting = true; - try { - this.lock.wait(GemFireTimeSync.this.clockSyncInterval*1000); - } finally { - this.waiting = false; - } - } - } - } catch (InterruptedException e) { - // ignore unless cancelled - } - } - } - - public boolean cancelled() { - synchronized(this.lock) { - return this.cancelled; - } - } - - public void cancel() { - synchronized(this.lock) { - this.cancelled = true; - this.lock.notifyAll(); - } - } - - public void computeOffsetsForNewView() { - synchronized(this.lock) { - if (this.waiting) { - this.lock.notifyAll(); - } else { - this.skipWait = true; - } - } - } - } - - - /** - * Use only for unit testing. - */ - public void invokeServiceThreadForTest() { - if (this.syncThread != null) { - this.syncThread.computeOffsetsForNewView(); - } - } - - /** - * Use only for unit testing. - */ - public boolean isServiceThreadCancelledForTest() { - if (this.syncThread != null) { - return this.syncThread.cancelled(); - } else { - return true; - } - } - - public TestHook getTestHook() { - return testHook; - } - - public void setTestHook(TestHook testHook) { - this.testHook = testHook; - } - - public interface TestHook { - - public void hook(int barrier); - - public void setResponses(Map responses, long currentTime); - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/HDRS.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/HDRS.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/HDRS.java deleted file mode 100644 index 6e719eb..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/HDRS.java +++ /dev/null @@ -1,53 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -// $Id: HDRS.java,v 1.2 2004/03/30 06:47:21 belaban Exp $ - -package com.gemstone.org.jgroups.protocols; - - -import com.gemstone.org.jgroups.Event; -import com.gemstone.org.jgroups.Message; -import com.gemstone.org.jgroups.stack.Protocol; - - -/** - * Example of a protocol layer. Contains no real functionality, can be used as a template. - */ -public class HDRS extends Protocol { - @Override // GemStoneAddition - public String getName() {return "HDRS";} - - - private void printMessage(Message msg, String label) { - System.out.println("------------------------- " + label + " ----------------------"); - System.out.println(msg); - msg.printObjectHeaders(); - System.out.println("--------------------------------------------------------------"); - } - - - @Override // GemStoneAddition - public void up(Event evt) { - if(evt.getType() == Event.MSG) { - Message msg=(Message)evt.getArg(); - printMessage(msg, "up"); - } - passUp(evt); // Pass up to the layer above us - } - - - - @Override // GemStoneAddition - public void down(Event evt) { - if(evt.getType() == Event.MSG) { - Message msg=(Message)evt.getArg(); - printMessage(msg, "down"); - } - - passDown(evt); // Pass on to the layer below us - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/HTOTAL.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/HTOTAL.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/HTOTAL.java deleted file mode 100644 index e1583a1..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/HTOTAL.java +++ /dev/null @@ -1,207 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -// $Id: HTOTAL.java,v 1.4 2005/09/01 11:41:00 belaban Exp $ - -package com.gemstone.org.jgroups.protocols; - - -import com.gemstone.org.jgroups.*; -import com.gemstone.org.jgroups.stack.Protocol; -import com.gemstone.org.jgroups.util.ExternalStrings; -import com.gemstone.org.jgroups.util.Streamable; -import com.gemstone.org.jgroups.util.Util; - -import java.io.*; -import java.util.Properties; -import java.util.Vector; - - -/** - * Implementation of UTO-TCP as designed by EPFL. Implements chaining algorithm: each sender sends the message - * to a coordinator who then forwards it to its neighbor on the right, who then forwards it to its neighbor to the right - * etc. - * @author Bela Ban - * @version $Id: HTOTAL.java,v 1.4 2005/09/01 11:41:00 belaban Exp $ - */ -public class HTOTAL extends Protocol { - Address coord=null; - Address neighbor=null; // to whom do we forward the message (member to the right, or null if we're at the tail) - Address local_addr=null; - Vector mbrs=new Vector(); -// boolean is_coord=false; GemStoneAddition - private boolean use_multipoint_forwarding=false; - - - - - public HTOTAL() { - } - - @Override // GemStoneAddition - public final String getName() { - return "HTOTAL"; - } - - @Override // GemStoneAddition - public boolean setProperties(Properties props) { - String str; - - super.setProperties(props); - str=props.getProperty("use_multipoint_forwarding"); - if(str != null) { - use_multipoint_forwarding=Boolean.valueOf(str).booleanValue(); - props.remove("use_multipoint_forwarding"); - } - - if(props.size() > 0) { - log.error(ExternalStrings.HTOTAL_TCPSETPROPERTIES_THE_FOLLOWING_PROPERTIES_ARE_NOT_RECOGNIZED__0, props); - - return false; - } - return true; - } - - @Override // GemStoneAddition - public void down(Event evt) { - switch(evt.getType()) { - case Event.VIEW_CHANGE: - determineCoordinatorAndNextMember((View)evt.getArg()); - break; - case Event.MSG: - Message msg=(Message)evt.getArg(); - Address dest=msg.getDest(); - if(dest == null || dest.isMulticastAddress()) { // only process multipoint messages - if(coord == null) - log.error(ExternalStrings.HTOTAL_COORDINATOR_IS_NULL_CANNOT_SEND_MESSAGE_TO_COORDINATOR); - else { - msg.setSrc(local_addr); - forwardTo(coord, msg); - } - return; // handled here, don't pass down by default - } - break; - } - passDown(evt); - } - - @Override // GemStoneAddition - public void up(Event evt) { - switch(evt.getType()) { - case Event.SET_LOCAL_ADDRESS: - local_addr=(Address)evt.getArg(); - break; - case Event.VIEW_CHANGE: - determineCoordinatorAndNextMember((View)evt.getArg()); - break; - case Event.MSG: - Message msg=(Message)evt.getArg(); - HTotalHeader hdr=(HTotalHeader)msg.getHeader(getName()); - - if(hdr == null) - break; // probably a unicast message, just pass it up - - Message copy=msg.copy(false); // do not copy the buffer - if(use_multipoint_forwarding) { - copy.setDest(null); - passDown(new Event(Event.MSG, copy)); - } - else { - if(neighbor != null) { - forwardTo(neighbor, copy); - } - } - - msg.setDest(hdr.dest); // set destination to be the original destination - msg.setSrc(hdr.src); // set sender to be the original sender (important for retransmission requests) - - passUp(evt); // <-- we modify msg directly inside evt - return; - } - passUp(evt); - } - - private void forwardTo(Address destination, Message msg) { - HTotalHeader hdr=(HTotalHeader)msg.getHeader(getName()); - - if(hdr == null) { - hdr=new HTotalHeader(msg.getDest(), msg.getSrc()); - msg.putHeader(getName(), hdr); - } - msg.setDest(destination); - if(trace) - log.trace("forwarding message to " + destination + ", hdr=" + hdr); - passDown(new Event(Event.MSG, msg)); - } - - - private void determineCoordinatorAndNextMember(View v) { - Object tmp; - Address retval=null; - - mbrs.clear(); - mbrs.addAll(v.getMembers()); - - coord=(Address)(/* mbrs != null && GemStoneAddition (cannot be null) */ mbrs.size() > 0? mbrs.firstElement() : null); -// is_coord=coord != null && local_addr != null && coord.equals(local_addr); GemStoneAddition - - if(/* mbrs == null || GemStoneAddition (cannot be null) */ mbrs.size() < 2 || local_addr == null) - neighbor=null; - else { - for(int i=0; i < mbrs.size(); i++) { - tmp=mbrs.elementAt(i); - if(local_addr.equals(tmp)) { - if(i + 1 >= mbrs.size()) { -// retval=null; // we don't wrap, last member is null GemStoneAddition (redundant assignment) - } - else - retval=(Address)mbrs.elementAt(i + 1); - break; - } - } - } - neighbor=retval; - if(trace) - log.trace("coord=" + coord + ", neighbor=" + neighbor); - } - - - public static class HTotalHeader extends Header implements Streamable { - Address dest, src; - - public HTotalHeader() { - } - - public HTotalHeader(Address dest, Address src) { - this.dest=dest; - this.src=src; - } - - public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(dest); - out.writeObject(src); - } - - public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - dest=(Address)in.readObject(); - src=(Address)in.readObject(); - } - - public void writeTo(DataOutputStream out) throws IOException { - Util.writeAddress(dest, out); - Util.writeAddress(src, out); - } - - public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException { - dest=Util.readAddress(in); - src=Util.readAddress(in); - } - - @Override // GemStoneAddition - public String toString() { - return "dest=" + dest + ", src=" + src; - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/LOOPBACK.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/LOOPBACK.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/LOOPBACK.java deleted file mode 100644 index 407ddeb..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/LOOPBACK.java +++ /dev/null @@ -1,115 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -// $Id: LOOPBACK.java,v 1.16 2005/08/26 12:26:33 belaban Exp $ - -package com.gemstone.org.jgroups.protocols; - - - -import com.gemstone.org.jgroups.Address; -import com.gemstone.org.jgroups.Event; -import com.gemstone.org.jgroups.Message; -import com.gemstone.org.jgroups.stack.IpAddress; -import com.gemstone.org.jgroups.stack.Protocol; -import com.gemstone.org.jgroups.util.Util; - - -/** - Makes copies of outgoing messages, swaps sender and receiver and sends the message back up the stack. - */ -public class LOOPBACK extends Protocol { - private Address local_addr=null; - private String group_addr=null; - - public LOOPBACK() { - } - - - @Override // GemStoneAddition - public String toString() { - return "Protocol LOOPBACK(local address: " + local_addr + ')'; - } - - - - - /*------------------------------ Protocol interface ------------------------------ */ - - @Override // GemStoneAddition - public String getName() { - return "LOOPBACK"; - } - - - - @Override // GemStoneAddition - public void init() throws Exception { -// local_addr=new IpAddress("localhost", 10000) { // fake address -// public String toString() { -// return ""; -// } -// }; - - //local_addr=new org.jgroups.stack.IpAddress("localhost", 10000); // fake address - local_addr = new IpAddress(12345); - } - - @Override // GemStoneAddition - public void start() throws Exception { - passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr)); - } - - - /** - * Caller by the layer above this layer. Usually we just put this Message - * into the send queue and let one or more worker threads handle it. A worker thread - * then removes the Message from the send queue, performs a conversion and adds the - * modified Message to the send queue of the layer below it, by calling Down). - */ - @Override // GemStoneAddition - public void down(Event evt) { - if(trace) - log.trace("event is " + evt + ", group_addr=" + group_addr + - ", time is " + System.currentTimeMillis() + ", hdrs: " + Util.printEvent(evt)); - - switch(evt.getType()) { - - case Event.MSG: - Message msg=(Message)evt.getArg(); - Message rsp=msg.copy(); - if(rsp.getSrc() == null) - rsp.setSrc(local_addr); - - //dest_addr=msg.getDest(); - //rsp.setDest(local_addr); - //rsp.setSrc(dest_addr != null ? dest_addr : local_addr); - up(new Event(Event.MSG, rsp)); - break; - - case Event.GET_LOCAL_ADDRESS: // return local address -> Event(SET_LOCAL_ADDRESS, local) - passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr)); - break; - - case Event.CONNECT: - group_addr=(String)evt.getArg(); - passUp(new Event(Event.CONNECT_OK)); - break; - - case Event.DISCONNECT: - passUp(new Event(Event.DISCONNECT_OK)); - break; - - case Event.PERF: - passUp(evt); - break; - } - } - - - - /*--------------------------- End of Protocol interface -------------------------- */ - - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/LOSS.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/LOSS.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/LOSS.java deleted file mode 100644 index 8fc8f44..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/LOSS.java +++ /dev/null @@ -1,118 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -// $Id: LOSS.java,v 1.3 2004/09/23 16:29:41 belaban Exp $ - -package com.gemstone.org.jgroups.protocols; - - -import com.gemstone.org.jgroups.Event; -import com.gemstone.org.jgroups.Message; -import com.gemstone.org.jgroups.View; -import com.gemstone.org.jgroups.stack.Protocol; -import com.gemstone.org.jgroups.util.Util; - -import java.util.Vector; - - -/** - * Example of a protocol layer. Contains no real functionality, can be used as a template. - */ - -public class LOSS extends Protocol { - final Vector members=new Vector(); - static/*GemStoneAddition*/ final long i=0; - boolean drop_next_msg=false; - - /** All protocol names have to be unique ! */ - @Override // GemStoneAddition - public String getName() {return "LOSS";} - - - - /** Just remove if you don't need to reset any state */ - public void reset() {} - - - - -// public void up(Event evt) { -// Message msg; - -// switch(evt.getType()) { - -// case Event.MSG: -// msg=(Message)evt.getArg(); -// if(msg.getDest() != null && !((Address)msg.getDest()).isMulticastAddress()) { -// // System.err.println("LOSS.up(): not dropping msg as it is unicast !"); -// break; -// } - -// i++; - -// int r=((int)(Math.random() * 1000)) % 10; - -// if(r != 0 && i % r == 0) { // drop -// System.out.println("####### LOSS.up(): dropping message " + -// Util.printEvent(evt)); -// return; -// } - -// break; -// } - -// passUp(evt); // Pass up to the layer above us -// } - - - - - - @Override // GemStoneAddition - public void down(Event evt) { - Message msg; - - switch(evt.getType()) { - - case Event.TMP_VIEW: - case Event.VIEW_CHANGE: - Vector new_members=((View)evt.getArg()).getMembers(); - synchronized(members) { - members.removeAllElements(); - if(new_members != null && new_members.size() > 0) - for(int i=0; i < new_members.size(); i++) - members.addElement(new_members.elementAt(i)); - } - passDown(evt); - break; - - case Event.MSG: - if(drop_next_msg) { - drop_next_msg=false; - msg=(Message)evt.getArg(); - - if(msg.getDest() != null && !msg.getDest().isMulticastAddress()) { - break; - } - - - System.out.println("###### LOSS.down(): dropping msg " + Util.printMessage(msg)); - - return; - } - break; - - case Event.DROP_NEXT_MSG: - drop_next_msg=true; - break; - } - - - - passDown(evt); // Pass on to the layer below us - } - - - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/MERGE.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/MERGE.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/MERGE.java deleted file mode 100644 index 7d2b6d7..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/MERGE.java +++ /dev/null @@ -1,376 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -// $Id: MERGE.java,v 1.10 2005/08/11 12:43:47 belaban Exp $ - -package com.gemstone.org.jgroups.protocols; - - - -import com.gemstone.org.jgroups.*; -import com.gemstone.org.jgroups.stack.Protocol; -import com.gemstone.org.jgroups.stack.RouterStub; -import com.gemstone.org.jgroups.util.ExternalStrings; -import com.gemstone.org.jgroups.util.List; -import com.gemstone.org.jgroups.util.Util; - -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.Enumeration; -import java.util.Properties; -import java.util.Vector; - - -/** - * Simple and stupid MERGE protocol (does not take into account state transfer). - * Periodically mcasts a HELLO message with its own address. When a HELLO message is - * received from a member that has the same group (UDP discards all messages with a group - * name different that our own), but is not currently in the group, a MERGE event is sent - * up the stack. The protocol starts working upon receiving a View in which it is the coordinator. - * - * @author Gianluca Collot, Jan 2001 - */ -public class MERGE extends Protocol implements Runnable { - final Vector members=new Vector(); - Address local_addr=null; - String group_addr=null; - final String groupname=null; - - // GemStoneAddition: access hello_thread synchronized on this - Thread hello_thread=null; // thread that periodically mcasts HELLO messages - long timeout=5000; // timeout between mcasting of HELLO messages - - String router_host=null; - int router_port=0; - - RouterStub client=null; - boolean is_server=false; - boolean is_coord=false; - boolean merging=false; - - - @Override // GemStoneAddition - public String getName() { - return "MERGE"; - } - - - @Override // GemStoneAddition - public boolean setProperties(Properties props) { - String str; - - super.setProperties(props); - str=props.getProperty("timeout"); // max time to wait for initial members - if(str != null) { - timeout=Long.parseLong(str); - props.remove("timeout"); - } - - str=props.getProperty("router_host"); // host to send gossip queries (if gossip enabled) - if(str != null) { - router_host=str; - props.remove("router_host"); - } - - str=props.getProperty("router_port"); - if(str != null) { - router_port=Integer.parseInt(str); - props.remove("router_port"); - } - - if(router_host != null && router_port != 0) - client=new RouterStub(router_host, router_port); - - if(props.size() > 0) { - log.error(ExternalStrings.MERGE_MERGESETPROPERTIES_THE_FOLLOWING_PROPERTIES_ARE_NOT_RECOGNIZED__0, props); - - return false; - } - return true; - } - - - @Override // GemStoneAddition - public void start() throws Exception { - synchronized (this) { // GemStoneAddition - if(hello_thread == null) { - hello_thread=new Thread(this, "MERGE Thread"); - hello_thread.setDaemon(true); - hello_thread.start(); - } - } - } - - - @Override // GemStoneAddition - public void stop() { - Thread tmp; - synchronized (this) { // GemStoneAddition - tmp = hello_thread; - hello_thread = null; - } - if(tmp != null && tmp.isAlive()) { -// tmp=hello_thread; -// hello_thread=null; - tmp.interrupt(); - try { - tmp.join(1000); - } - catch(InterruptedException ex) { - Thread.currentThread().interrupt(); // GemStoneAddition - } - } -// hello_thread=null; GemStoneAddition - } - - - @Override // GemStoneAddition - public void up(Event evt) { - Message msg; - Object obj; - MergeHeader hdr; - Address sender; - boolean contains; - Vector tmp; - - - switch(evt.getType()) { - - case Event.MSG: - msg=(Message)evt.getArg(); - obj=msg.getHeader(getName()); - if(obj == null || !(obj instanceof MergeHeader)) { - passUp(evt); - return; - } - hdr=(MergeHeader)msg.removeHeader(getName()); - - switch(hdr.type) { - - case MergeHeader.HELLO: // if coord: handle, else: discard - if(!is_server || !is_coord) { - return; - } - if(merging) { - return; - } - sender=msg.getSrc(); - if((sender != null) && (members.size() >= 0)) { - synchronized(members) { - contains=members.contains(sender); - } - //merge only with lower addresses :prevents cycles and ensures that the new coordinator is correct. - if(!contains && sender.compareTo(local_addr) < 0) { - if(log.isInfoEnabled()) - log.info("membership " + members + - " does not contain " + sender + "; merging it"); - tmp=new Vector(); - tmp.addElement(sender); - merging=true; - passUp(new Event(Event.MERGE, tmp)); - } - } - return; - - default: - if(log.isErrorEnabled()) log.error(ExternalStrings.MERGE_GOT_MERGE_HDR_WITH_UNKNOWN_TYPE_0, hdr.type); - return; - } - - case Event.SET_LOCAL_ADDRESS: - local_addr=(Address)evt.getArg(); - passUp(evt); - break; - - default: - passUp(evt); // Pass up to the layer above us - break; - } - } - - - @Override // GemStoneAddition - public void down(Event evt) { - - switch(evt.getType()) { - - case Event.TMP_VIEW: - passDown(evt); - break; - - case Event.MERGE_DENIED: - merging=false; - passDown(evt); - break; - - case Event.VIEW_CHANGE: - merging=false; - synchronized(members) { - members.clear(); - members.addAll(((View)evt.getArg()).getMembers()); - if(/* (members == null) || GemStoneAddition (cannot be null) */ (members.size() == 0)) { - if(log.isFatalEnabled()) log.fatal("received VIEW_CHANGE with null or empty vector"); - //System.exit(6); - } - } - is_coord=members.elementAt(0).equals(local_addr); - passDown(evt); - if(is_coord) { - if(log.isInfoEnabled()) log.info(ExternalStrings.MERGE_START_SENDING_HELLOS); - try { - start(); - } - catch(Exception ex) { - if(warn) log.warn("exception calling start(): " + ex); - } - } - else { - if(log.isInfoEnabled()) log.info(ExternalStrings.MERGE_STOP_SENDING_HELLOS); - stop(); - } - break; - - case Event.BECOME_SERVER: // called after client has join and is fully working group member - passDown(evt); - try { - start(); - is_server=true; - } - catch(Exception ex) { - if(warn) log.warn("exception calling start(): " + ex); - } - break; - - case Event.CONNECT: - group_addr=(String)evt.getArg(); - passDown(evt); - break; - - case Event.DISCONNECT: - if(local_addr != null && evt.getArg() != null && local_addr.equals(evt.getArg())) - stop(); - passDown(evt); - break; - - default: - passDown(evt); // Pass on to the layer below us - break; - } - } - - - /** - * If IP multicast: periodically mcast a HELLO message - * If gossiping: periodically retrieve the membership. Any members not part of our - * own membership are merged (passing MERGE event up). - */ - public void run() { - Message hello_msg; - MergeHeader hdr; - List rsps; - Vector members_to_merge=new Vector(), tmp; - Object mbr; - - - try { - Thread.sleep(3000); - } /// initial sleep; no premature merging - catch (InterruptedException e) { // GemStoneAddition - return; // exit thread, no need to reset interrupt - } - - - for (;;) { // GemStoneAddition remove coding anti-pattern - try { // GemStoneAddition - Util.sleep(timeout); - } - catch (InterruptedException e) { - break; // exit loop and thread, no need to reset interrupt - } -// if(hello_thread == null) break; GemStoneAddition - if (Thread.currentThread().isInterrupted()) break; // GemStoneAddition - for safety - - if(client == null) { // plain IP MCAST - hello_msg=new Message(null, null, null); - hdr=new MergeHeader(MergeHeader.HELLO); - hello_msg.putHeader(getName(), hdr); - passDown(new Event(Event.MSG, hello_msg)); - } - else { // gossiping; contact Router - rsps=client.get(group_addr); - - synchronized(members) { - members_to_merge.removeAllElements(); - - for(Enumeration e=rsps.elements(); e.hasMoreElements();) { - mbr=e.nextElement(); - if(!members.contains(mbr)) { - - if(log.isInfoEnabled()) - log.info("membership " + members + - " does not contain " + mbr + "; merging it"); - - members_to_merge.addElement(mbr); - } - } - if(members_to_merge.size() > 0) { - Membership new_membership=new Membership(members_to_merge); - new_membership.sort(); - Address coord=(Address)new_membership.elementAt(0); - tmp=new Vector(); - tmp.addElement(coord); - if(coord.compareTo(local_addr) < 0) - passUp(new Event(Event.MERGE, tmp)); - } - } - } - } - } - - - - - - - /* -------------------------- Private methods ---------------------------- */ - - - public static class MergeHeader extends Header { - public static final int HELLO=1; // arg = null - - public int type=0; - - public MergeHeader() { - } // used for externalization - - public MergeHeader(int type) { - this.type=type; - } - - @Override // GemStoneAddition - public String toString() { - return "[MERGE: type=" + type2Str(type) + ']'; - } - - String type2Str(int t) { - switch(t) { - case HELLO: - return "HELLO"; - default: - return ""; - } - } - - public void writeExternal(ObjectOutput out) throws IOException { - out.writeInt(type); - } - - - public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - type=in.readInt(); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/MERGE2.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/MERGE2.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/MERGE2.java deleted file mode 100644 index dd37b68..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/MERGE2.java +++ /dev/null @@ -1,362 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -// $Id: MERGE2.java,v 1.25 2005/10/04 15:47:47 belaban Exp $ - -package com.gemstone.org.jgroups.protocols; - - - -import com.gemstone.org.jgroups.Address; -import com.gemstone.org.jgroups.Event; -import com.gemstone.org.jgroups.View; -import com.gemstone.org.jgroups.stack.Protocol; -import com.gemstone.org.jgroups.util.ExternalStrings; -import com.gemstone.org.jgroups.util.Promise; -import com.gemstone.org.jgroups.util.Util; - -import java.util.Properties; -import java.util.Vector; - - - - -/** - * Protocol to discover subgroups; e.g., existing due to a network partition (that healed). Example: group - * {p,q,r,s,t,u,v,w} is split into 3 subgroups {p,q}, {r,s,t,u} and {v,w}. This protocol will eventually send - * a MERGE event with the coordinators of each subgroup up the stack: {p,r,v}. Note that - depending on the time - * of subgroup discovery - there could also be 2 MERGE events, which first join 2 of the subgroups, and then the - * resulting group to the last subgroup. The real work of merging the subgroups into one larger group is done - * somewhere above this protocol (typically in the GMS protocol).

- * This protocol works as follows: - *

    - *
  • If coordinator: periodically retrieve the initial membership (using the FIND_INITIAL_MBRS event provided e.g. - * by PING or TCPPING protocols. This list contains {coord,addr} pairs. - *
  • If there is more than 1 coordinator: - *
      - *
    1. Get all coordinators - *
    2. Create a MERGE event with the list of coordinators as argument - *
    3. Send the event up the stack - *
    - *
- * - *

- * - * Requires: FIND_INITIAL_MBRS event from below
- * Provides: sends MERGE event with list of coordinators up the stack
- * @author Bela Ban, Oct 16 2001 - */ -public class MERGE2 extends Protocol { - Address local_addr=null; - FindSubgroups task=null; // task periodically executing as long as we are coordinator - private final Object task_lock=new Object(); - long min_interval=5000; // minimum time between executions of the FindSubgroups task - long max_interval=20000; // maximum time between executions of the FindSubgroups task - boolean is_coord=false; - final Promise find_promise=new Promise(); // to synchronize FindSubgroups.findInitialMembers() on - - /** Use a new thread to send the MERGE event up the stack */ - boolean use_separate_thread=false; - - - @Override // GemStoneAddition - public String getName() { - return "MERGE2"; - } - - public long getMinInterval() { - return min_interval; - } - - public void setMinInterval(long i) { - min_interval=i; - } - - public long getMaxInterval() { - return max_interval; - } - - public void setMaxInterval(long l) { - max_interval=l; - } - - - @Override // GemStoneAddition - public boolean setProperties(Properties props) { - String str; - - super.setProperties(props); - str=props.getProperty("min_interval"); - if(str != null) { - min_interval=Long.parseLong(str); - props.remove("min_interval"); - } - - str=props.getProperty("max_interval"); - if(str != null) { - max_interval=Long.parseLong(str); - props.remove("max_interval"); - } - - if(min_interval <= 0 || max_interval <= 0) { - if(log.isErrorEnabled()) log.error(ExternalStrings.MERGE2_MIN_INTERVAL_AND_MAX_INTERVAL_HAVE_TO_BE__0); - return false; - } - if(max_interval <= min_interval) { - if(log.isErrorEnabled()) log.error(ExternalStrings.MERGE2_MAX_INTERVAL_HAS_TO_BE_GREATER_THAN_MIN_INTERVAL); - return false; - } - - str=props.getProperty("use_separate_thread"); - if(str != null) { - use_separate_thread=Boolean.valueOf(str).booleanValue(); - props.remove("use_separate_thread"); - } - - if(props.size() > 0) { - log.error(ExternalStrings.MERGE2_MERGE2SETPROPERTIES_THE_FOLLOWING_PROPERTIES_ARE_NOT_RECOGNIZED__0, props); - - return false; - } - return true; - } - - - @Override // GemStoneAddition - public Vector requiredDownServices() { - Vector retval=new Vector(1); - retval.addElement(Integer.valueOf(Event.FIND_INITIAL_MBRS)); - return retval; - } - - - @Override // GemStoneAddition - public void stop() { - is_coord=false; - stopTask(); - } - - - /** - * This prevents the up-handler thread to be created, which is not needed in the protocol. - * DON'T REMOVE ! - */ - @Override // GemStoneAddition - public void startUpHandler() { - } - - - /** - * This prevents the down-handler thread to be created, which is not needed in the protocol. - * DON'T REMOVE ! - */ - @Override // GemStoneAddition - public void startDownHandler() { - } - - - @Override // GemStoneAddition - public void up(Event evt) { - switch(evt.getType()) { - - case Event.SET_LOCAL_ADDRESS: - local_addr=(Address)evt.getArg(); - passUp(evt); - break; - - case Event.FIND_INITIAL_MBRS_OK: - find_promise.setResult(evt.getArg()); - passUp(evt); // could be needed by GMS - break; - - default: - passUp(evt); // Pass up to the layer above us - break; - } - } - - - @Override // GemStoneAddition - public void down(Event evt) { - Vector mbrs; - Address coord; - - switch(evt.getType()) { - - case Event.VIEW_CHANGE: - passDown(evt); - mbrs=((View)evt.getArg()).getMembers(); - if(mbrs == null || mbrs.size() == 0 || local_addr == null) { - stopTask(); - break; - } - coord=(Address)mbrs.elementAt(0); - if(coord.equals(local_addr)) { - is_coord=true; - startTask(); // start task if we became coordinator (doesn't start if already running) - } - else { - // if we were coordinator, but are no longer, stop task. this happens e.g. when we merge and someone - // else becomes the new coordinator of the merged group - if(is_coord) { - is_coord=false; - } - stopTask(); - } - break; - - default: - passDown(evt); // Pass on to the layer below us - break; - } - } - - - /* -------------------------------------- Private Methods --------------------------------------- */ - void startTask() { - synchronized(task_lock) { - if(task == null) - task=new FindSubgroups(); - task.start(); - } - } - - void stopTask() { - synchronized(task_lock) { - if(task != null) { - task.stop(); // will cause timer to remove task from execution schedule - task=null; - } - } - } - /* ---------------------------------- End of Private Methods ------------------------------------ */ - - - - - /** - * Task periodically executing (if role is coordinator). Gets the initial membership and determines - * whether there are subgroups (multiple coordinators for the same group). If yes, it sends a MERGE event - * with the list of the coordinators up the stack - */ - protected/*GemStoneAddition*/ class FindSubgroups implements Runnable { - // GemStoneAddition: #thread must be synchronized on this - Thread thread=null; - - - public synchronized /* GemStoneAddition */ void start() { - if(thread == null || !thread.isAlive()) { - thread=new Thread(this, "MERGE2.FindSubgroups thread"); - thread.setDaemon(true); - thread.start(); - } - } - - - public synchronized /* GemStoneAddition */ void stop() { - if(thread != null) { - Thread tmp=thread; - thread=null; - tmp.interrupt(); // wakes up sleeping thread - find_promise.reset(); - } - thread=null; - } - - - public void run() { - long interval; - Vector coords; - Vector initial_mbrs; - - // if(log.isDebugEnabled()) log.debug("merge task started as I'm the coordinator"); - for (;;) { // GemStoneAddition remove coding anti-pattern - interval=computeInterval(); - try { // GemStoneAddition - Util.sleep(interval); - } - catch (InterruptedException e) { - break; // exits thread - } -// if(thread == null) break; GemStoneAddition - if (Thread.currentThread().isInterrupted()) break; // GemStoneAddition - initial_mbrs=findInitialMembers(); -// if(thread == null) break; GemStoneAddition - if (Thread.currentThread().isInterrupted()) break; // GemStoneAddition - if(log.isDebugEnabled()) log.debug("initial_mbrs=" + initial_mbrs); - coords=detectMultipleCoordinators(initial_mbrs); - if(coords != null && coords.size() > 1) { - if(log.isDebugEnabled()) - log.debug("found multiple coordinators: " + coords + "; sending up MERGE event"); - final Event evt=new Event(Event.MERGE, coords); - if(use_separate_thread) { - Thread merge_notifier=new Thread() { - @Override // GemStoneAddition - public void run() { - passUp(evt); - } - }; - merge_notifier.setDaemon(true); - merge_notifier.setName("merge notifier thread"); - merge_notifier.start(); - } - else { - passUp(evt); - } - } - } - if(trace) - log.trace("MERGE2.FindSubgroups thread terminated (local_addr=" + local_addr + ")"); - } - - - /** - * Returns a random value within [min_interval - max_interval] - */ - long computeInterval() { - return min_interval + Util.random(max_interval - min_interval); - } - - - /** - * Returns a list of PingRsp pairs. - */ - Vector findInitialMembers() { - PingRsp tmp=new PingRsp(local_addr, local_addr, true); - find_promise.reset(); - passDown(Event.FIND_INITIAL_MBRS_EVT); - Vector retval=(Vector)find_promise.getResult(0); // wait indefinitely until response is received - if(retval != null && is_coord && local_addr != null && !retval.contains(tmp)) - retval.add(tmp); - return retval; - } - - - /** - * Finds out if there is more than 1 coordinator in the initial_mbrs vector (contains PingRsp elements). - * @param initial_mbrs A list of PingRsp pairs - * @return Vector A list of the coordinators (Addresses) found. Will contain just 1 element for a correct - * membership, and more than 1 for multiple coordinators - */ - Vector detectMultipleCoordinators(Vector initial_mbrs) { - Vector ret=new Vector(11); - PingRsp rsp; - Address coord; - - if(initial_mbrs == null) return null; - for(int i=0; i < initial_mbrs.size(); i++) { - rsp=(PingRsp)initial_mbrs.elementAt(i); - if(!rsp.is_server) - continue; - coord=rsp.getCoordAddress(); - if(!ret.contains(coord)) - ret.addElement(coord); - } - - return ret; - } - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/MERGE3.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/MERGE3.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/MERGE3.java deleted file mode 100644 index b71da43..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/MERGE3.java +++ /dev/null @@ -1,312 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -// $Id: MERGE3.java,v 1.8 2005/08/08 12:45:43 belaban Exp $ - -package com.gemstone.org.jgroups.protocols; - - - -import com.gemstone.org.jgroups.*; -import com.gemstone.org.jgroups.stack.Protocol; -import com.gemstone.org.jgroups.util.ExternalStrings; -import com.gemstone.org.jgroups.util.TimeScheduler; -import com.gemstone.org.jgroups.util.Util; - -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.*; - - - - -/** - * Protocol to discover subgroups; e.g., existing due to a network partition (that healed). Example: group - * {p,q,r,s,t,u,v,w} is split into 3 subgroups {p,q}, {r,s,t,u} and {v,w}. This protocol will eventually send - * a MERGE event with the coordinators of each subgroup up the stack: {p,r,v}. Note that - depending on the time - * of subgroup discovery - there could also be 2 MERGE events, which first join 2 of the subgroups, and then the - * resulting group to the last subgroup. The real work of merging the subgroups into one larger group is done - * somewhere above this protocol (typically in the GMS protocol).

- * This protocol works as follows: - *

    - *
  • If coordinator: periodically broadcast a "I'm the coordinator" message. If a coordinator receives such - * a message, it immediately initiates a merge by sending up a MERGE event - *

    - * - * Provides: sends MERGE event with list of coordinators up the stack
    - * @author Bela Ban, Oct 16 2001 - */ -public class MERGE3 extends Protocol { - Address local_addr=null; - long min_interval=5000; // minimum time between executions of the FindSubgroups task - long max_interval=20000; // maximum time between executions of the FindSubgroups task - boolean is_coord=false; - final Vector mbrs=new Vector(); - TimeScheduler timer=null; - CoordinatorAnnouncer announcer_task=null; - final Set announcements=Collections.synchronizedSet(new HashSet()); - - /** Use a new thread to send the MERGE event up the stack */ - boolean use_separate_thread=false; - - - - - @Override // GemStoneAddition - public String getName() { - return "MERGE3"; - } - - - @Override // GemStoneAddition - public boolean setProperties(Properties props) { - String str; - - super.setProperties(props); - str=props.getProperty("min_interval"); - if(str != null) { - min_interval=Long.parseLong(str); - props.remove("min_interval"); - } - - str=props.getProperty("max_interval"); - if(str != null) { - max_interval=Long.parseLong(str); - props.remove("max_interval"); - } - - if(min_interval <= 0 || max_interval <= 0) { - if(log.isErrorEnabled()) log.error(ExternalStrings.MERGE3_MIN_INTERVAL_AND_MAX_INTERVAL_HAVE_TO_BE__0); - return false; - } - if(max_interval <= min_interval) { - if(log.isErrorEnabled()) log.error(ExternalStrings.MERGE3_MAX_INTERVAL_HAS_TO_BE_GREATER_THAN_MIN_INTERVAL); - return false; - } - - str=props.getProperty("use_separate_thread"); - if(str != null) { - use_separate_thread=Boolean.valueOf(str).booleanValue(); - props.remove("use_separate_thread"); - } - - if(props.size() > 0) { - log.error(ExternalStrings.MERGE3_MERGE2SETPROPERTIES_THE_FOLLOWING_PROPERTIES_ARE_NOT_RECOGNIZED__0, props); - - return false; - } - return true; - } - - @Override // GemStoneAddition - public void init() throws Exception { - timer=stack.timer; - } - - - /** - * This prevents the up-handler thread to be created, which is not needed in the protocol. - * DON'T REMOVE ! - */ - @Override // GemStoneAddition - public void startUpHandler() { - } - - - /** - * This prevents the down-handler thread to be created, which is not needed in the protocol. - * DON'T REMOVE ! - */ - @Override // GemStoneAddition - public void startDownHandler() { - } - - - @Override // GemStoneAddition - public void up(Event evt) { - switch(evt.getType()) { - - case Event.MSG: - Message msg=(Message)evt.getArg(); - CoordAnnouncement hdr=(CoordAnnouncement)msg.removeHeader(getName()); - if(hdr != null) { - if(hdr.coord_addr != null && is_coord) { - boolean contains; - contains=announcements.contains(hdr.coord_addr); - announcements.add(hdr.coord_addr); - if(log.isDebugEnabled()) { - if(contains) - log.debug("discarded duplicate announcement: " + hdr.coord_addr + - ", announcements=" + announcements); - else - log.debug("received announcement: " + hdr.coord_addr + ", announcements=" + announcements); - } - - if(announcements.size() > 1 && is_coord) { - processAnnouncements(); - } - } - } - else - passUp(evt); - break; - - case Event.SET_LOCAL_ADDRESS: - local_addr=(Address)evt.getArg(); - passUp(evt); - break; - - default: - passUp(evt); // Pass up to the layer above us - break; - } - } - - - @Override // GemStoneAddition - public void down(Event evt) { - Vector tmp; - Address coord; - - switch(evt.getType()) { - - case Event.VIEW_CHANGE: - passDown(evt); - tmp=((View)evt.getArg()).getMembers(); - mbrs.clear(); - mbrs.addAll(tmp); - coord=(Address)mbrs.elementAt(0); - if(coord.equals(local_addr)) { - if(is_coord == false) { - is_coord=true; - startCoordAnnouncerTask(); - } - } - else { - if(is_coord == true) { - is_coord=false; - stopCoordAnnouncerTask(); - } - } - break; - - default: - passDown(evt); // Pass on to the layer below us - break; - } - } - - - void startCoordAnnouncerTask() { - if(announcer_task == null) { - announcements.add(local_addr); - announcer_task=new CoordinatorAnnouncer(); - timer.add(announcer_task); - if(log.isDebugEnabled()) - log.debug("coordinator announcement task started, announcements=" + announcements); - } - } - - void stopCoordAnnouncerTask() { - if(announcer_task != null) { - announcer_task.stop(); - announcer_task=null; - announcements.clear(); - if(log.isDebugEnabled()) - log.debug("coordinator announcement task stopped"); - } - } - - - - /** - * Returns a random value within [min_interval - max_interval] - */ - long computeInterval() { - return min_interval + Util.random(max_interval - min_interval); - } - - - - void sendCoordinatorAnnouncement(Address coord) { - Message coord_announcement=new Message(); // multicast to all - CoordAnnouncement hdr=new CoordAnnouncement(coord); - coord_announcement.putHeader(getName(), hdr); - passDown(new Event(Event.MSG, coord_announcement)); - } - - void processAnnouncements() { - if(announcements.size() > 1) { - Vector coords=new Vector(announcements); // create a clone - if(coords.size() > 1) { - if(log.isDebugEnabled()) - log.debug("passing up MERGE event, coords=" + coords); - final Event evt=new Event(Event.MERGE, coords); - if(use_separate_thread) { - Thread merge_notifier=new Thread() { - @Override // GemStoneAddition - public void run() { - passUp(evt); - } - }; - merge_notifier.setDaemon(true); - merge_notifier.setName("merge notifier thread"); - } - else { - passUp(evt); - } - } - announcements.clear(); - } - } - - - class CoordinatorAnnouncer implements TimeScheduler.Task { - boolean cancelled=false; - - public void start() { - cancelled=false; - } - - public void stop() { - cancelled=true; - } - - public boolean cancelled() { - return cancelled; - } - - public long nextInterval() { - return computeInterval(); - } - - public void run() { - if(is_coord) - sendCoordinatorAnnouncement(local_addr); - } - } - - - - public static class CoordAnnouncement extends Header { - Address coord_addr=null; - - public CoordAnnouncement() { - } - - public CoordAnnouncement(Address coord) { - this.coord_addr=coord; - } - - public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - coord_addr=(Address)in.readObject(); - } - - public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(coord_addr); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/MERGEFAST.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/MERGEFAST.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/MERGEFAST.java deleted file mode 100644 index 6f98867..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/MERGEFAST.java +++ /dev/null @@ -1,118 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -package com.gemstone.org.jgroups.protocols; - - -import com.gemstone.org.jgroups.*; -import com.gemstone.org.jgroups.stack.Protocol; - -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.Vector; - -/** - * The coordinator attaches a small header to each (or every nth) message. If another coordinator in the - * same group sees the message, it will initiate the merge protocol immediately by sending a MERGE - * event up the stack. - * @author Bela Ban, Aug 25 2003 - */ -public class MERGEFAST extends Protocol { - Address local_addr=null; - boolean is_coord=false; - static final String name="MERGEFAST"; - - @Override // GemStoneAddition - public String getName() { - return name; - } - - - @Override // GemStoneAddition - public void down(Event evt) { - if(is_coord == true && evt.getType() == Event.MSG && local_addr != null) { - Message msg=(Message)evt.getArg(); - Address dest=msg.getDest(); - if(dest == null || dest.isMulticastAddress()) { - msg.putHeader(getName(), new MergefastHeader(local_addr)); - } - } - - if(evt.getType() == Event.VIEW_CHANGE) { - handleViewChange((View)evt.getArg()); - } - - passDown(evt); - } - - - - @Override // GemStoneAddition - public void up(Event evt) { - switch(evt.getType()) { - case Event.SET_LOCAL_ADDRESS: - local_addr=(Address)evt.getArg(); - break; - case Event.MSG: - if(is_coord == false) // only handle message if we are coordinator - break; - Message msg=(Message)evt.getArg(); - MergefastHeader hdr=(MergefastHeader)msg.removeHeader(name); - passUp(evt); - if(hdr != null && local_addr != null) { - Address other_coord=hdr.coord; - if(!local_addr.equals(other_coord)) { - sendUpMerge(new Address[]{local_addr, other_coord}); - } - } - return; // event was already passed up - case Event.VIEW_CHANGE: - handleViewChange((View)evt.getArg()); - break; - } - passUp(evt); - } - - - void handleViewChange(View v) { - Vector mbrs; - if(local_addr == null) - return; - mbrs=v.getMembers(); - is_coord=mbrs != null && mbrs.size() > 0 && local_addr.equals(mbrs.firstElement()); - } - - // @todo avoid sending up too many MERGE events. - void sendUpMerge(Address[] addresses) { - Vector v=new Vector(11); - for(int i=0; i < addresses.length; i++) { - Address addr=addresses[i]; - v.add(addr); - } - passUp(new Event(Event.MERGE, v)); - } - - - public static class MergefastHeader extends Header { - Address coord=null; - - public MergefastHeader() { - } - - public MergefastHeader(Address coord) { - this.coord=coord; - } - - public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(coord); - } - - public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - coord=(Address)in.readObject(); - } - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/MessageProtocolEXAMPLE.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/MessageProtocolEXAMPLE.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/MessageProtocolEXAMPLE.java deleted file mode 100644 index ff206ed..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/MessageProtocolEXAMPLE.java +++ /dev/null @@ -1,64 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -// $Id: MessageProtocolEXAMPLE.java,v 1.2 2004/03/30 06:47:21 belaban Exp $ - -package com.gemstone.org.jgroups.protocols; - - - - - -import com.gemstone.org.jgroups.Event; -import com.gemstone.org.jgroups.Message; -import com.gemstone.org.jgroups.stack.MessageProtocol; - - - - - - -/** - - */ -public class MessageProtocolEXAMPLE extends MessageProtocol { - - @Override // GemStoneAddition - public String getName() {return "MessageProtocolEXAMPLE";} - - - /** - Callback. Called when a request for this protocol layer is received. - */ - @Override // GemStoneAddition - public Object handle(Message req) { - System.out.println("MessageProtocolEXAMPLE.handle(): this method should be overridden !"); - return null; - } - - - - - /** - Callback. Called by superclass when event may be handled.

    - Do not use PassUp in this method as the event is passed up - by default by the superclass after this method returns ! - @return boolean Defaults to true. If false, event will not be passed up the stack. - */ - @Override // GemStoneAddition - public boolean handleUpEvent(Event evt) {return true;} - - - /** - Callback. Called by superclass when event may be handled.

    - Do not use PassDown in this method as the event is passed down - by default by the superclass after this method returns ! - @return boolean Defaults to true. If false, event will not be passed down the stack. - */ - @Override // GemStoneAddition - public boolean handleDownEvent(Event evt) {return true;} - - - -}