Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id EEC56200B5E for ; Wed, 10 Aug 2016 21:41:07 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id ED6B1160AB2; Wed, 10 Aug 2016 19:41:07 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BBE18160AB1 for ; Wed, 10 Aug 2016 21:41:05 +0200 (CEST) Received: (qmail 55416 invoked by uid 500); 10 Aug 2016 19:41:04 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 55283 invoked by uid 99); 10 Aug 2016 19:41:04 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 10 Aug 2016 19:41:04 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 3E53C1A139E for ; Wed, 10 Aug 2016 19:41:04 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.645 X-Spam-Level: X-Spam-Status: No, score=-4.645 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id Nh39346XhYbe for ; Wed, 10 Aug 2016 19:40:55 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 9E1DB60E72 for ; Wed, 10 Aug 2016 19:40:49 +0000 (UTC) Received: (qmail 50455 invoked by uid 99); 10 Aug 2016 19:40:48 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 10 Aug 2016 19:40:48 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EB19BE3889; Wed, 10 Aug 2016 19:40:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: udo@apache.org To: commits@geode.incubator.apache.org Date: Wed, 10 Aug 2016 19:41:29 -0000 Message-Id: <706f37b52bcc459dbae2066220afb516@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [43/50] [abbrv] incubator-geode git commit: GEODE-1726 Clean up compilation warnings in new GMS archived-at: Wed, 10 Aug 2016 19:41:08 -0000 GEODE-1726 Clean up compilation warnings in new GMS Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/5f840549 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/5f840549 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/5f840549 Branch: refs/heads/feature/GEODE-420 Commit: 5f84054955dc873add294cc0115f9d52e420e168 Parents: f3db3e8 Author: Bruce Schuchardt Authored: Fri Aug 5 11:26:43 2016 -0700 Committer: Bruce Schuchardt Committed: Fri Aug 5 11:32:35 2016 -0700 ---------------------------------------------------------------------- .../internal/membership/NetView.java | 80 ++-- .../internal/membership/gms/GMSUtil.java | 30 +- .../internal/membership/gms/ServiceConfig.java | 56 +-- .../internal/membership/gms/Services.java | 3 +- .../membership/gms/fd/GMSHealthMonitor.java | 218 +++++----- .../membership/gms/locator/GMSLocator.java | 19 +- .../membership/gms/membership/GMSJoinLeave.java | 53 ++- .../gms/messenger/GMSQuorumChecker.java | 43 +- .../gms/messenger/JGroupsMessenger.java | 77 ++-- .../gms/mgr/GMSMembershipManager.java | 413 +++++++------------ .../gms/membership/GMSJoinLeaveJUnitTest.java | 2 +- .../sanctionedDataSerializables.txt | 4 +- .../codeAnalysis/sanctionedSerializables.txt | 2 +- 13 files changed, 387 insertions(+), 613 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5f840549/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java index 4c33e08..f3c04f7 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java @@ -28,8 +28,8 @@ import java.util.Iterator; import java.util.List; import java.util.Random; import java.util.Set; +import java.util.stream.*; -import com.gemstone.gemfire.internal.logging.LogService; import org.apache.logging.log4j.Logger; import com.gemstone.gemfire.DataSerializer; @@ -46,6 +46,7 @@ import com.gemstone.gemfire.internal.Version; * * @since GemFire 5.5 */ +@SuppressWarnings("SynchronizeOnNonFinalField") public class NetView implements DataSerializableFixedID { private int viewId; @@ -60,7 +61,7 @@ public class NetView implements DataSerializableFixedID { public NetView() { viewId = 0; - members = new ArrayList(4); + members = new ArrayList<>(4); this.hashedMembers = new HashSet<>(members); shutdownMembers = Collections.emptySet(); crashedMembers = new HashSet<>(); @@ -70,7 +71,7 @@ public class NetView implements DataSerializableFixedID { public NetView(InternalDistributedMember creator) { viewId = 0; - members = new ArrayList(4); + members = new ArrayList<>(4); members.add(creator); hashedMembers = new HashSet<>(members); shutdownMembers = new HashSet<>(); @@ -81,7 +82,7 @@ public class NetView implements DataSerializableFixedID { public NetView(InternalDistributedMember creator, int viewId, List members) { this.viewId = viewId; - this.members = new ArrayList(members); + this.members = new ArrayList<>(members); hashedMembers = new HashSet<>(this.members); shutdownMembers = new HashSet<>(); crashedMembers = Collections.emptySet(); @@ -90,11 +91,15 @@ public class NetView implements DataSerializableFixedID { } - // legacy method for JGMM + /** + * Test method + * @param size size of the view, used for presizing collections + * @param viewId the ID of the view + */ public NetView(int size, long viewId) { this.viewId = (int) viewId; - members = new ArrayList(size); - this.hashedMembers = new HashSet(); + members = new ArrayList<>(size); + this.hashedMembers = new HashSet<>(); shutdownMembers = new HashSet<>(); crashedMembers = Collections.emptySet(); creator = null; @@ -104,18 +109,16 @@ public class NetView implements DataSerializableFixedID { /** * Create a new view with the contents of the given view and the * specified view ID - * @param other - * @param viewId */ public NetView(NetView other, int viewId) { this.creator = other.creator; this.viewId = viewId; - this.members = new ArrayList(other.members); - this.hashedMembers = new HashSet(other.members); + this.members = new ArrayList<>(other.members); + this.hashedMembers = new HashSet<>(other.members); this.failureDetectionPorts = new int[other.failureDetectionPorts.length]; System.arraycopy(other.failureDetectionPorts, 0, this.failureDetectionPorts, 0, other.failureDetectionPorts.length); - this.shutdownMembers = new HashSet(other.shutdownMembers); - this.crashedMembers = new HashSet(other.crashedMembers); + this.shutdownMembers = new HashSet<>(other.shutdownMembers); + this.crashedMembers = new HashSet<>(other.crashedMembers); } public NetView(InternalDistributedMember creator, int viewId, List mbrs, Set shutdowns, @@ -123,7 +126,7 @@ public class NetView implements DataSerializableFixedID { this.creator = creator; this.viewId = viewId; this.members = mbrs; - this.hashedMembers = new HashSet(mbrs); + this.hashedMembers = new HashSet<>(mbrs); this.shutdownMembers = shutdowns; this.crashedMembers = crashes; this.failureDetectionPorts = new int[mbrs.size()+10]; @@ -165,7 +168,6 @@ public class NetView implements DataSerializableFixedID { /** * Transfer the failure-detection ports from another view to this one - * @param otherView */ public void setFailureDetectionPorts(NetView otherView) { int[] ports = otherView.getFailureDetectionPorts(); @@ -209,7 +211,7 @@ public class NetView implements DataSerializableFixedID { * return members that are i this view but not the given old view */ public List getNewMembers(NetView olderView) { - List result = new ArrayList(members); + List result = new ArrayList<>(members); result.removeAll(olderView.getMembers()); return result; } @@ -218,12 +220,9 @@ public class NetView implements DataSerializableFixedID { * return members added in this view */ public List getNewMembers() { - List result = new ArrayList(5); - for (InternalDistributedMember mbr : this.members) { - if (mbr.getVmViewId() == this.viewId) { - result.add(mbr); - } - } + List result = new ArrayList<>(5); + result.addAll(this.members.stream().filter(mbr -> mbr.getVmViewId() + == this.viewId).collect(Collectors.toList())); return result; } @@ -243,10 +242,6 @@ public class NetView implements DataSerializableFixedID { this.crashedMembers.addAll(mbr); } - public void addShutdownMembers(Set mbr) { - this.shutdownMembers.addAll(mbr); - } - public boolean remove(InternalDistributedMember mbr) { this.hashedMembers.remove(mbr); int idx = this.members.indexOf(mbr); @@ -259,12 +254,11 @@ public class NetView implements DataSerializableFixedID { public void removeAll(Collection ids) { this.hashedMembers.removeAll(ids); - for (InternalDistributedMember mbr: ids) { - remove(mbr); - } + ids.forEach(this::remove); } public boolean contains(DistributedMember mbr) { + assert mbr instanceof InternalDistributedMember; return this.hashedMembers.contains(mbr); } @@ -325,13 +319,13 @@ public class NetView implements DataSerializableFixedID { * And local member. * * @param filter Suspect member set. - * @param localAddress + * @param localAddress the address of this member * @param maxNumberDesired number of preferred coordinators to return * @return list of preferred coordinators */ public List getPreferredCoordinators(Set filter, InternalDistributedMember localAddress, int maxNumberDesired) { - List results = new ArrayList(); - List notPreferredCoordinatorList = new ArrayList(); + List results = new ArrayList<>(); + List notPreferredCoordinatorList = new ArrayList<>(); synchronized (members) { for (InternalDistributedMember addr : members) { @@ -449,13 +443,10 @@ public class NetView implements DataSerializableFixedID { */ public Set getActualCrashedMembers(NetView oldView) { Set result = new HashSet<>(this.crashedMembers.size()); - for (InternalDistributedMember mbr : this.crashedMembers) { - if ((mbr.getVmKind() != DistributionManager.ADMIN_ONLY_DM_TYPE)) { - if (oldView == null || oldView.contains(mbr)) { - result.add(mbr); - } - } - } + result.addAll(this.crashedMembers.stream() + .filter(mbr -> (mbr.getVmKind() != DistributionManager.ADMIN_ONLY_DM_TYPE)) + .filter(mbr -> oldView == null || oldView.contains(mbr)) + .collect(Collectors.toList())); return result; } @@ -554,14 +545,14 @@ public class NetView implements DataSerializableFixedID { } @Override - public synchronized boolean equals(Object arg0) { - if (arg0 == this) { + public synchronized boolean equals(Object other) { + if (other == this) { return true; } - if (!(arg0 instanceof NetView)) { + if (!(other instanceof NetView)) { return false; } - return this.members.equals(((NetView) arg0).getMembers()); + return this.members.equals(((NetView) other).getMembers()); } @Override @@ -584,7 +575,8 @@ public class NetView implements DataSerializableFixedID { creator = DataSerializer.readObject(in); viewId = in.readInt(); members = DataSerializer.readArrayList(in); - this.hashedMembers = new HashSet(members); + assert members != null; + this.hashedMembers = new HashSet<>(members); shutdownMembers = InternalDataSerializer.readHashSet(in); crashedMembers = InternalDataSerializer.readHashSet(in); failureDetectionPorts = DataSerializer.readIntArray(in); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5f840549/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSUtil.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSUtil.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSUtil.java old mode 100755 new mode 100644 index 6478c70..d489431 --- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSUtil.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSUtil.java @@ -26,11 +26,9 @@ import java.util.StringTokenizer; import org.apache.logging.log4j.Logger; import com.gemstone.gemfire.GemFireConfigException; -import com.gemstone.gemfire.distributed.internal.membership.MemberAttributes; import com.gemstone.gemfire.internal.SocketCreator; public class GMSUtil { - static Logger logger = Services.getLogger(); public static List parseLocators(String locatorsString, String bindAddress) { InetAddress addr = null; @@ -49,11 +47,11 @@ public class GMSUtil { public static List parseLocators(String locatorsString, InetAddress bindAddress) { - List result=new ArrayList(2); + List result= new ArrayList<>(2); String host; int port; boolean checkLoopback = (bindAddress != null); - boolean isLoopback = (checkLoopback? bindAddress.isLoopbackAddress() : false); + boolean isLoopback = (checkLoopback && bindAddress.isLoopbackAddress()); StringTokenizer parts=new StringTokenizer(locatorsString, ","); while(parts.hasMoreTokens()) { @@ -87,7 +85,7 @@ public class GMSUtil { result.add(isa); } catch(NumberFormatException e) { - // this shouldln't happen because the config has already been parsed and + // this shouldn't happen because the config has already been parsed and // validated } } @@ -100,7 +98,7 @@ public class GMSUtil { * given value */ public static String replaceStrings(String properties, String property, String value) { - StringBuffer sb = new StringBuffer(); + StringBuilder sb = new StringBuilder(); int start = 0; int index = properties.indexOf(property); while (index != -1) { @@ -136,24 +134,4 @@ public class GMSUtil { } - /** compareTo for InetAddresses */ - public static int compareAddresses(InetAddress one, InetAddress two) { - byte[] oneBytes = one.getAddress(); - byte[] twoBytes = two.getAddress(); - - if (oneBytes != twoBytes) { - for (int i = 0; i < oneBytes.length; i++) { - if (i >= twoBytes.length) - return -1; // same as far as they go, but shorter... - if (oneBytes[i] < twoBytes[i]) - return -1; - if (oneBytes[i] > twoBytes[i]) - return 1; - } - if (oneBytes.length > twoBytes.length) - return 1; // same as far as they go, but longer... - } - return 0; - } - } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5f840549/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/ServiceConfig.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/ServiceConfig.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/ServiceConfig.java old mode 100755 new mode 100644 index 2a41574..6df0c7b --- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/ServiceConfig.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/ServiceConfig.java @@ -30,21 +30,21 @@ public class ServiceConfig { public static final long MEMBER_REQUEST_COLLECTION_INTERVAL = Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "member-request-collection-interval", 300); /** various settings from Geode configuration */ - private long joinTimeout; - private int[] membershipPortRange; - private int udpRecvBufferSize; - private int udpSendBufferSize; - private long memberTimeout; + private final long joinTimeout; + private final int[] membershipPortRange; + private final int udpRecvBufferSize; + private final int udpSendBufferSize; + private final long memberTimeout; private Integer lossThreshold; - private Integer memberWeight; + private final Integer memberWeight; private boolean networkPartitionDetectionEnabled; - private int locatorWaitTime; + private final int locatorWaitTime; /** the configuration for the distributed system */ - private DistributionConfig dconfig; + private final DistributionConfig dconfig; /** the transport config from the distribution manager */ - private RemoteTransportConfig transport; + private final RemoteTransportConfig transport; public int getLocatorWaitTime() { @@ -62,16 +62,6 @@ public class ServiceConfig { } - public int getUdpRecvBufferSize() { - return udpRecvBufferSize; - } - - - public int getUdpSendBufferSize() { - return udpSendBufferSize; - } - - public long getMemberTimeout() { return memberTimeout; } @@ -95,30 +85,6 @@ public class ServiceConfig { this.networkPartitionDetectionEnabled = enabled; } - /** - * returns the address that will be used by the DirectChannel to - * identify this member - */ - public InetAddress getInetAddress() { - String bindAddress = this.dconfig.getBindAddress(); - - try { - /* note: had to change the following to make sure the prop wasn't empty - in addition to not null for admin.DistributedSystemFactory */ - if (bindAddress != null && bindAddress.length() > 0) { - return InetAddress.getByName(bindAddress); - - } - else { - return SocketCreator.getLocalHost(); - } - } - catch (java.net.UnknownHostException unhe) { - throw new RuntimeException(unhe); - - } - } - public boolean areLocatorsPreferredAsCoordinators() { boolean locatorsAreCoordinators = false; @@ -151,7 +117,7 @@ public class ServiceConfig { this.transport = transport; long defaultJoinTimeout = 24000; - if (theConfig.getLocators().length() > 0 && !Locator.hasLocators()) { + if (theConfig.getLocators().length() > 0 && !Locator.hasLocator()) { defaultJoinTimeout = 60000; } @@ -160,7 +126,7 @@ public class ServiceConfig { long minimumJoinTimeout = dconfig.getMemberTimeout() * 2 + MEMBER_REQUEST_COLLECTION_INTERVAL; if (defaultJoinTimeout < minimumJoinTimeout) { defaultJoinTimeout = minimumJoinTimeout; - }; + } joinTimeout = Long.getLong("p2p.joinTimeout", defaultJoinTimeout).longValue(); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5f840549/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java index 7d1f2d9..5bc006f 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java @@ -41,6 +41,7 @@ import org.apache.logging.log4j.Logger; import java.util.Timer; +@SuppressWarnings("ConstantConditions") public class Services { private static final Logger logger = LogService.getLogger(); @@ -66,7 +67,7 @@ public class Services { private InternalLogWriter logWriter; private InternalLogWriter securityLogWriter; - private Timer timer = new Timer("Geode Membership Timer", true); + private final Timer timer = new Timer("Geode Membership Timer", true); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5f840549/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java index 4b63d4f..9fdbb64 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java @@ -19,7 +19,6 @@ package com.gemstone.gemfire.distributed.internal.membership.gms.fd; import static com.gemstone.gemfire.internal.DataSerializableFixedID.HEARTBEAT_REQUEST; import static com.gemstone.gemfire.internal.DataSerializableFixedID.HEARTBEAT_RESPONSE; import static com.gemstone.gemfire.internal.DataSerializableFixedID.SUSPECT_MEMBERS_MESSAGE; -import static com.sun.corba.se.impl.naming.cosnaming.NamingUtils.debug; import java.io.DataInputStream; import java.io.DataOutputStream; @@ -41,6 +40,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.*; import org.apache.logging.log4j.Logger; import org.jgroups.util.UUID; @@ -49,7 +49,6 @@ import com.gemstone.gemfire.CancelException; import com.gemstone.gemfire.GemFireConfigException; import com.gemstone.gemfire.SystemConnectException; import com.gemstone.gemfire.distributed.DistributedMember; -import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException; import com.gemstone.gemfire.distributed.internal.DMStats; import com.gemstone.gemfire.distributed.internal.DistributionMessage; import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; @@ -87,6 +86,7 @@ import com.gemstone.gemfire.internal.Version; * for that member. * * */ +@SuppressWarnings({ "SynchronizationOnLocalVariableOrMethodParameter", "NullableProblems" }) public class GMSHealthMonitor implements HealthMonitor, MessageHandler { private Services services; @@ -98,7 +98,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { private final AtomicInteger requestId = new AtomicInteger(); /** membership logger */ - private static Logger logger = Services.getLogger(); + private static final Logger logger = Services.getLogger(); /** * The number of recipients of periodic heartbeats. The recipients will @@ -116,7 +116,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { /** stall time to wait for members leaving concurrently */ public static final long MEMBER_SUSPECT_COLLECTION_INTERVAL = Long.getLong("geode.suspect-member-collection-interval", 200); - volatile long currentTimeStamp; + private volatile long currentTimeStamp; /** this member's ID */ private InternalDistributedMember localAddress; @@ -156,10 +156,10 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { private ScheduledFuture monitorFuture; /** test hook */ - volatile boolean playingDead = false; + private volatile boolean playingDead = false; /** test hook */ - volatile boolean beingSick = false; + private volatile boolean beingSick = false; // For TCP check private ExecutorService serverSocketExecutor; @@ -169,7 +169,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { private volatile ServerSocket serverSocket; /** Statistics about health monitor */ - protected DMStats stats; + private DMStats stats; /** * this class is to avoid garbage @@ -261,7 +261,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { class ClientSocketHandler implements Runnable { - private Socket socket; + private final Socket socket; public ClientSocketHandler(Socket socket) { this.socket = socket; @@ -272,7 +272,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { socket.setTcpNoDelay(true); DataInputStream in = new DataInputStream(socket.getInputStream()); OutputStream out = socket.getOutputStream(); - @SuppressWarnings("unused") + @SuppressWarnings("UnusedAssignment") short version = in.readShort(); int vmViewId = in.readInt(); long uuidLSBs = in.readLong(); @@ -337,6 +337,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { } + @SuppressWarnings("EmptyMethod") public static void loadEmergencyClasses() { } @@ -380,27 +381,23 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { setNextNeighbor(cv, mbr); // we need to check this member - checkExecutor.execute(new Runnable() { - - @Override - public void run() { - boolean pinged = false; - try { - pinged = GMSHealthMonitor.this.doCheckMember(mbr, true); - } catch (CancelException e) { - return; - } - - if (!pinged) { - suspectedMemberInView.put(mbr, currentView); - String reason = "Member isn't responding to heartbeat requests"; - GMSHealthMonitor.this.initiateSuspicion(mbr, reason); - } else { - logger.trace("Setting next neighbor as member {} has responded.", mbr); - suspectedMemberInView.remove(mbr); - // back to previous one - setNextNeighbor(GMSHealthMonitor.this.currentView, null); - } + checkExecutor.execute(() -> { + boolean pinged = false; + try { + pinged = GMSHealthMonitor.this.doCheckMember(mbr, true); + } catch (CancelException e) { + return; + } + + if (!pinged) { + suspectedMemberInView.put(mbr, currentView); + String reason = "Member isn't responding to heartbeat requests"; + GMSHealthMonitor.this.initiateSuspicion(mbr, reason); + } else { + logger.trace("Setting next neighbor as member {} has responded.", mbr); + suspectedMemberInView.remove(mbr); + // back to previous one + setNextNeighbor(GMSHealthMonitor.this.currentView, null); } }); @@ -411,7 +408,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { return; } SuspectRequest sr = new SuspectRequest(mbr, reason); - List sl = new ArrayList(); + List sl = new ArrayList<>(); sl.add(sr); sendSuspectRequest(sl); } @@ -586,17 +583,14 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { } public void start() { - scheduler = Executors.newScheduledThreadPool(1, new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - Thread th = new Thread(Services.getThreadGroup(), r, "Geode Failure Detection Scheduler"); - th.setDaemon(true); - return th; - } + scheduler = Executors.newScheduledThreadPool(1, r -> { + Thread th = new Thread(Services.getThreadGroup(), r, "Geode Failure Detection Scheduler"); + th.setDaemon(true); + return th; }); checkExecutor = Executors.newCachedThreadPool(new ThreadFactory() { - AtomicInteger threadIdx = new AtomicInteger(); + final AtomicInteger threadIdx = new AtomicInteger(); @Override public Thread newThread(Runnable r) { @@ -619,10 +613,10 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { // } // }, MEMBER_SUSPECT_COLLECTION_INTERVAL); // suspectRequestCollectorThread.setDaemon(true); -// suspectRequestCollectorThread.start(); +// suspectRequestCollectorThread.start() serverSocketExecutor = Executors.newCachedThreadPool(new ThreadFactory() { - AtomicInteger threadIdx = new AtomicInteger(); + final AtomicInteger threadIdx = new AtomicInteger(); @Override public Thread newThread(Runnable r) { @@ -636,14 +630,12 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { } ServerSocket createServerSocket(InetAddress socketAddress, int[] portRange) { - ServerSocket serverSocket = null; + ServerSocket serverSocket; try { serverSocket = SocketCreator.getDefaultInstance().createServerSocketUsingPortRange(socketAddress, 50/*backlog*/, true/*isBindAddress*/, false/*useNIO*/, 65536/*tcpBufferSize*/, portRange, false); socketPort = serverSocket.getLocalPort(); - } catch (IOException e) { - throw new GemFireConfigException("Unable to allocate a failure detection port in the membership-port range", e); - } catch (SystemConnectException e) { + } catch (IOException | SystemConnectException e) { throw new GemFireConfigException("Unable to allocate a failure detection port in the membership-port range", e); } return serverSocket; @@ -657,48 +649,45 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { // allocate a socket here so there are no race conditions between knowing the FD // socket port and joining the system - serverSocketExecutor.execute(new Runnable() { - @Override - public void run() { - logger.info("Started failure detection server thread on {}:{}.", ssocket.getInetAddress(), socketPort); - Socket socket = null; - try { - while (!services.getCancelCriterion().isCancelInProgress() - && !GMSHealthMonitor.this.isStopping) { + serverSocketExecutor.execute(() -> { + logger.info("Started failure detection server thread on {}:{}.", ssocket.getInetAddress(), socketPort); + Socket socket = null; + try { + while (!services.getCancelCriterion().isCancelInProgress() + && !GMSHealthMonitor.this.isStopping) { + try { + socket = ssocket.accept(); + if (GMSHealthMonitor.this.playingDead) { + continue; + } + serverSocketExecutor.execute(new ClientSocketHandler(socket)); //start(); [bruce] I'm seeing a lot of failures due to this thread not being created fast enough, sometimes as long as 30 seconds + + } catch (RejectedExecutionException e) { + // this can happen during shutdown + + } catch (IOException e) { + if (!isStopping) { + logger.trace("Unexpected exception", e); + } try { - socket = ssocket.accept(); - if (GMSHealthMonitor.this.playingDead) { - continue; - } - serverSocketExecutor.execute(new ClientSocketHandler(socket)); //start(); [bruce] I'm seeing a lot of failures due to this thread not being created fast enough, sometimes as long as 30 seconds - - } catch (RejectedExecutionException e) { - // this can happen during shutdown - - } catch (IOException e) { - if (!isStopping) { - logger.trace("Unexpected exception", e); - } - try { - if (socket != null) { - socket.close(); - } - } catch (IOException ioe) { - logger.trace("Unexpected exception", ioe); + if (socket != null) { + socket.close(); } + } catch (IOException ioe) { + logger.trace("Unexpected exception", ioe); } } - logger.info("GMSHealthMonitor server thread exiting"); - } finally { - // close the server socket - if (ssocket != null && !ssocket.isClosed()) { - try { - ssocket.close(); - serverSocket = null; - logger.info("GMSHealthMonitor server socket closed."); - } catch (IOException e) { - logger.debug("Unexpected exception", e); - } + } + logger.info("GMSHealthMonitor server thread exiting"); + } finally { + // close the server socket + if (!ssocket.isClosed()) { + try { + ssocket.close(); + serverSocket = null; + logger.info("GMSHealthMonitor server socket closed."); + } catch (IOException e) { + logger.debug("Unexpected exception", e); } } } @@ -829,7 +818,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { List allMembers = newView.getMembers(); - Set checkAllSuspected = new HashSet<>(allMembers); + Set checkAllSuspected = new HashSet<>(allMembers); checkAllSuspected.removeAll(suspectedMemberInView.keySet()); checkAllSuspected.remove(localAddress); if (checkAllSuspected.isEmpty() && allMembers.size() > 1) { @@ -899,8 +888,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { } Collection val = requestIdVsResponse.values(); - for (Iterator it = val.iterator(); it.hasNext();) { - Response r = it.next(); + for (Response r : val) { synchronized (r) { r.notify(); } @@ -1066,8 +1054,6 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { * to become the new membership coordinator. * it will to final check on that member and then it will send remove request * for that member - * - * @param incomingRequest */ private void processSuspectMembersRequest(SuspectMembersMessage incomingRequest) { @@ -1118,13 +1104,11 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { else { NetView check = new NetView(cv, cv.getViewId() + 1); - ArrayList smbr = new ArrayList(); + ArrayList smbr = new ArrayList<>(); synchronized (viewVsSuspectedMembers) { recordSuspectRequests(sMembers, cv); Set viewVsMembers = viewVsSuspectedMembers.get(cv); - Iterator itr = viewVsMembers.iterator(); - while (itr.hasNext()) { - SuspectRequest sr = itr.next(); + for (final SuspectRequest sr : viewVsMembers) { check.remove(sr.getSuspectMember()); smbr.add(sr); } @@ -1148,16 +1132,14 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { /*** * This method make sure that records suspectRequest. We need to make sure this * on preferred coordinators, as elder coordinator might be in suspected list next. - * @param sMembers - * @param cv */ private void recordSuspectRequests(List sMembers, NetView cv) { // record suspect requests - Set viewVsMembers = null; + Set viewVsMembers; synchronized (viewVsSuspectedMembers) { viewVsMembers = viewVsSuspectedMembers.get(cv); if (viewVsMembers == null) { - viewVsMembers = new HashSet(); + viewVsMembers = new HashSet<>(); viewVsSuspectedMembers.put(cv, viewVsMembers); } for (SuspectRequest sr: sMembers) { @@ -1171,16 +1153,11 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { * socket information is available for the member (in the view) then * we attempt to connect to its socket and ask if it's the expected member. * Otherwise we send a heartbeat request and wait for a reply. - * - * @param initiator - * @param sMembers - * @param cv */ private void checkIfAvailable(final InternalDistributedMember initiator, List sMembers, final NetView cv) { - for (int i = 0; i < sMembers.size(); i++) { - final SuspectRequest sr = sMembers.get(i); + for (final SuspectRequest sr : sMembers) { final InternalDistributedMember mbr = sr.getSuspectMember(); if (!cv.contains(mbr) || membersInFinalCheck.contains(mbr)) { @@ -1203,25 +1180,16 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { final String reason = sr.getReason(); logger.debug("Scheduling final check for member {}; reason={}", mbr, reason); // its a coordinator - checkExecutor.execute(new Runnable() { - - @Override - public void run() { - try { - inlineCheckIfAvailable(initiator, cv, true, mbr, - reason); - } catch (DistributedSystemDisconnectedException e) { - return; - } catch (CancelException e) { - // shutting down - } catch (Exception e) { - logger.info("Unexpected exception while verifying member", e); - } finally { - GMSHealthMonitor.this.suspectedMemberInView.remove(mbr); - } + checkExecutor.execute(() -> { + try { + inlineCheckIfAvailable(initiator, cv, true, mbr, reason); + } catch (CancelException e) { + // shutting down + } catch (Exception e) { + logger.info("Unexpected exception while verifying member", e); + } finally { + GMSHealthMonitor.this.suspectedMemberInView.remove(mbr); } - - }); // }// scheduling for final check and removing it.. } @@ -1317,13 +1285,11 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { logger.debug("Sending suspect request for members {}", requests); List recipients; if (currentView.size() > 4) { - HashSet filter = new HashSet(); + HashSet filter = new HashSet<>(); for (Enumeration e = suspectedMemberInView.keys(); e.hasMoreElements();) { filter.add(e.nextElement()); } - for (int i = 0; i < requests.size(); i++) { - filter.add(requests.get(i).getSuspectMember()); - } + filter.addAll(requests.stream().map(SuspectRequest::getSuspectMember).collect(Collectors.toList())); recipients = currentView.getPreferredCoordinators(filter, services.getJoinLeave().getMemberID(), 5); } else { recipients = currentView.getMembers(); @@ -1344,9 +1310,9 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { } private static class ConnectTimeoutTask extends TimerTask implements ConnectionWatcher { - Timer scheduler; + final Timer scheduler; Socket socket; - long timeout; + final long timeout; ConnectTimeoutTask(Timer scheduler, long timeout) { this.scheduler = scheduler; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5f840549/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/GMSLocator.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/GMSLocator.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/GMSLocator.java old mode 100755 new mode 100644 index aab9002..1065214 --- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/GMSLocator.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/GMSLocator.java @@ -72,7 +72,7 @@ public class GMSLocator implements Locator, NetLocator { private final LocatorStats stats; private InternalDistributedMember localAddress; - private Set registrants = new HashSet(); + private final Set registrants = new HashSet<>(); /** * The current membership view, or one recovered from disk. @@ -99,7 +99,7 @@ public class GMSLocator implements Locator, NetLocator { this.networkPartitionDetectionEnabled = networkPartitionDetectionEnabled; this.locatorString = locatorString; if (this.locatorString == null || this.locatorString.length() == 0) { - this.locators = new ArrayList(0); + this.locators = new ArrayList<>(0); } else { this.locators = GMSUtil.parseLocators(locatorString, bindAddress); } @@ -178,7 +178,6 @@ public class GMSLocator implements Locator, NetLocator { } boolean fromView = false; - int viewId = -1; NetView v = this.view; if (v != null) { @@ -193,10 +192,10 @@ public class GMSLocator implements Locator, NetLocator { break; } } - viewId = v.getViewId(); + int viewId = v.getViewId(); if (viewId > findRequest.getLastViewId()) { // ignore the requests rejectedCoordinators if the view has changed - coord = v.getCoordinator(Collections.emptyList()); + coord = v.getCoordinator(Collections.emptyList()); } else { coord = v.getCoordinator(findRequest.getRejectedCoordinators()); } @@ -229,7 +228,7 @@ public class GMSLocator implements Locator, NetLocator { synchronized(registrants) { response = new FindCoordinatorResponse(coord, localAddress, - fromView, view, new HashSet(registrants), + fromView, view, new HashSet<>(registrants), this.networkPartitionDetectionEnabled, this.usePreferredCoordinators); } } @@ -240,7 +239,7 @@ public class GMSLocator implements Locator, NetLocator { return response; } - public void saveView(NetView view) { + private void saveView(NetView view) { if (viewFile == null) { return; } @@ -287,10 +286,10 @@ public class GMSLocator implements Locator, NetLocator { // test hook public List getMembers() { if (view != null) { - return new ArrayList(view.getMembers()); + return new ArrayList<>(view.getMembers()); } else { synchronized(registrants) { - return new ArrayList(registrants); + return new ArrayList<>(registrants); } } } @@ -301,7 +300,7 @@ public class GMSLocator implements Locator, NetLocator { setMembershipManager(((InternalDistributedSystem)ds).getDM().getMembershipManager()); } - public void recover() throws InternalGemFireException { + private void recover() throws InternalGemFireException { if (!recoverFromOthers()) { recoverFromFile(viewFile); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5f840549/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java index 4b82fa0..58b794a 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java @@ -162,7 +162,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { /** * collects the response to a join request */ - private JoinResponseMessage[] joinResponse = new JoinResponseMessage[1]; + private final JoinResponseMessage[] joinResponse = new JoinResponseMessage[1]; /** * collects responses to new views @@ -227,7 +227,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { int locatorsContacted = 0; boolean hasContactedAJoinedLocator; NetView view; - Set responses = new HashSet<>(); + final Set responses = new HashSet<>(); void cleanup() { alreadyTried.clear(); @@ -366,7 +366,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { services.getMessenger().send(req); } - JoinResponseMessage response = null; + JoinResponseMessage response; try { response = waitForJoinResponse(); } catch (InterruptedException e) { @@ -393,7 +393,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { } //there is no way we can rech here right now - throw new RuntimeException("Join Request Failed with response " + joinResponse ); + throw new RuntimeException("Join Request Failed with response " + joinResponse[0] ); } private JoinResponseMessage waitForJoinResponse() throws InterruptedException { @@ -474,7 +474,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { return; } Object creds = incomingRequest.getCredentials(); - String rejection = null; + String rejection; try { rejection = services.getAuthenticator().authenticate(incomingRequest.getMemberID(), (Properties)creds); } catch (Exception e) { @@ -699,7 +699,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { startViewBroadcaster(); } else { // create and send out a new view - NetView newView = addMemberToNetView(this.currentView, oldCoordinator); + NetView newView = addMemberToNetView(oldCoordinator); createAndStartViewCreator(newView); startViewBroadcaster(); } @@ -717,9 +717,9 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { } } - private NetView addMemberToNetView(NetView netView, InternalDistributedMember oldCoordinator) { + private NetView addMemberToNetView(InternalDistributedMember oldCoordinator) { boolean testing = unitTesting.contains("noRandomViewChange"); - NetView newView = null; + NetView newView; Set leaving = new HashSet<>(); Set removals; synchronized (viewInstallationLock) { @@ -758,7 +758,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { } } - private void sendRemoveMessages(List removals, List reasons, NetView newView, Set oldIds) { + private void sendRemoveMessages(List removals, List reasons, Set oldIds) { Iterator reason = reasons.iterator(); for (InternalDistributedMember mbr : removals) { //if olds not contains mbr then send remove request @@ -772,15 +772,14 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { } boolean prepareView(NetView view, List newMembers) throws InterruptedException { - return sendView(view, newMembers, true, this.prepareProcessor); + return sendView(view, true, this.prepareProcessor); } void sendView(NetView view, List newMembers) throws InterruptedException { - sendView(view, newMembers, false, this.viewProcessor); + sendView(view, false, this.viewProcessor); } - private boolean sendView(NetView view, List newMembers, boolean preparing, - ViewReplyProcessor viewReplyProcessor) throws InterruptedException { + private boolean sendView(NetView view, boolean preparing, ViewReplyProcessor viewReplyProcessor) throws InterruptedException { int id = view.getViewId(); InstallViewMessage msg = new InstallViewMessage(view, services.getAuthenticator().getCredentials(this.localAddress), preparing); @@ -1428,8 +1427,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { @Override public void memberSuspected(InternalDistributedMember initiator, InternalDistributedMember suspect, String reason) { - prepareProcessor.memberSuspected(initiator, suspect); - viewProcessor.memberSuspected(initiator, suspect); + prepareProcessor.memberSuspected(suspect); + viewProcessor.memberSuspected(suspect); } @Override @@ -1440,7 +1439,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { stopCoordinatorServices(); if (view != null) { if (view.size() > 1) { - List coords = view.getPreferredCoordinators(Collections.emptySet(), localAddress, 5); + List coords = view.getPreferredCoordinators(Collections.emptySet(), localAddress, 5); logger.debug("Sending my leave request to {}", coords); LeaveRequestMessage m = new LeaveRequestMessage(coords, this.localAddress, "this member is shutting down"); services.getMessenger().send(m); @@ -1463,7 +1462,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { processRemoveRequest(msg); if (!this.isCoordinator) { msg.resetRecipients(); - msg.setRecipients(v.getPreferredCoordinators(Collections. emptySet(), localAddress, 10)); + msg.setRecipients(v.getPreferredCoordinators(Collections.emptySet(), localAddress, 10)); services.getMessenger().send(msg); } } else { @@ -1642,7 +1641,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { } } - synchronized void memberSuspected(InternalDistributedMember initiator, InternalDistributedMember suspect) { + synchronized void memberSuspected(InternalDistributedMember suspect) { if (waiting) { // we will do a final check on this member if it hasn't already // been done, so stop waiting for it now @@ -1707,7 +1706,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { } Set waitForResponses() throws InterruptedException { - Set result = null; + Set result; long endOfWait = System.currentTimeMillis() + viewAckTimeout; try { while (System.currentTimeMillis() < endOfWait && (!services.getCancelCriterion().isCancelInProgress())) { @@ -1797,7 +1796,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { /** * initial joining members. guarded by synch on ViewCreator */ - List initialJoins = Collections.emptyList(); + List initialJoins = Collections.emptyList(); /** * initial leaving members guarded by synch on ViewCreator */ @@ -2041,7 +2040,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { for (DistributionMessage msg : requests) { logger.debug("processing request {}", msg); - InternalDistributedMember mbr = null; + InternalDistributedMember mbr; switch (msg.getDSFID()) { case JOIN_REQUEST: JoinRequestMessage jmsg = (JoinRequestMessage) msg; @@ -2085,10 +2084,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { removalReqs.add(mbr); removalReasons.add(((RemoveMemberMessage) msg).getReason()); } else { - sendRemoveMessages(Collections.singletonList(mbr), - Collections.singletonList(((RemoveMemberMessage) msg).getReason()), - currentView, - new HashSet()); + sendRemoveMessages(Collections.singletonList(mbr), + Collections.singletonList(((RemoveMemberMessage) msg).getReason()), new HashSet()); } } break; @@ -2150,7 +2147,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { } // send removal messages before installing the view so we stop // getting messages from members that have been kicked out - sendRemoveMessages(removalReqs, removalReasons, newView, oldIDs); + sendRemoveMessages(removalReqs, removalReasons, oldIDs); prepareAndSendView(newView, joinReqs, leaveReqs, newView.getCrashedMembers()); @@ -2163,7 +2160,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { */ void prepareAndSendView(NetView newView, List joinReqs, Set leaveReqs, Set removalReqs) throws InterruptedException { - boolean prepared = false; + boolean prepared; do { if (this.shutdown || Thread.currentThread().isInterrupted()) { return; @@ -2255,7 +2252,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { for (int i = 0; i < size; i++) { reasons.add("Failed to acknowledge a new membership view and then failed tcp/ip connection attempt"); } - sendRemoveMessages(failures, reasons, newView, new HashSet()); + sendRemoveMessages(failures, reasons, new HashSet()); } // if there is no conflicting view then we can count http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5f840549/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSQuorumChecker.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSQuorumChecker.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSQuorumChecker.java index 1f8ed2e..63f4aa1 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSQuorumChecker.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSQuorumChecker.java @@ -48,13 +48,13 @@ public class GMSQuorumChecker implements QuorumChecker { private Set receivedAcks; - private NetView lastView; + private final NetView lastView; // guarded by this private boolean quorumAchieved = false; - private JChannel channel; + private final JChannel channel; private JGAddress myAddress; - private int partitionThreshold; + private final int partitionThreshold; public GMSQuorumChecker(NetView jgView, int partitionThreshold, JChannel channel) { this.lastView = jgView; @@ -63,7 +63,7 @@ public class GMSQuorumChecker implements QuorumChecker { } public void initialize() { - receivedAcks = new ConcurrentHashSet(); + receivedAcks = new ConcurrentHashSet<>(); pingPonger = new GMSPingPonger(); // UUID logicalAddress = (UUID) channel.getAddress(); @@ -72,7 +72,7 @@ public class GMSQuorumChecker implements QuorumChecker { // myAddress = new JGAddress(logicalAddress, ipaddr); myAddress = (JGAddress)channel.down(new Event(Event.GET_LOCAL_ADDRESS)); - addressConversionMap = new ConcurrentHashMap(this.lastView.size()); + addressConversionMap = new ConcurrentHashMap<>(this.lastView.size()); List members = this.lastView.getMembers(); for (InternalDistributedMember addr : members) { SocketAddress sockaddr = new InetSocketAddress(addr.getNetMember().getInetAddress(), addr.getPort()); @@ -92,15 +92,11 @@ public class GMSQuorumChecker implements QuorumChecker { if (isDebugEnabled) { logger.debug("beginning quorum check with {}", this); } - try { - sendPingMessages(); - quorumAchieved = waitForResponses(lastView.getMembers().size(), timeout); - // If we did not achieve full quorum, calculate if we achieved quorum - if (!quorumAchieved) { - quorumAchieved = calculateQuorum(); - } - } finally { - + sendPingMessages(); + quorumAchieved = waitForResponses(lastView.getMembers().size(), timeout); + // If we did not achieve full quorum, calculate if we achieved quorum + if (!quorumAchieved) { + quorumAchieved = calculateQuorum(); } return quorumAchieved; } @@ -209,18 +205,15 @@ public class GMSQuorumChecker implements QuorumChecker { @Override public void receive(Message msg) { - Object contents = msg.getBuffer(); - if (contents instanceof byte[]) { - byte[] msgBytes = (byte[]) contents; - if (pingPonger.isPingMessage(msgBytes)) { - try { - pingPonger.sendPongMessage(channel, myAddress, msg.getSrc()); - } catch (Exception e) { - logger.debug("Failed sending Pong message to " + msg.getSrc()); - } - } else if (pingPonger.isPongMessage(msgBytes)) { - pongReceived(msg.getSrc()); + byte[] msgBytes = msg.getBuffer(); + if (pingPonger.isPingMessage(msgBytes)) { + try { + pingPonger.sendPongMessage(channel, myAddress, msg.getSrc()); + } catch (Exception e) { + logger.debug("Failed sending Pong message to " + msg.getSrc()); } + } else if (pingPonger.isPongMessage(msgBytes)) { + pongReceived(msg.getSrc()); } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5f840549/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java index 879b0a5..a119bb5 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java @@ -59,11 +59,13 @@ import java.net.UnknownHostException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.*; import static com.gemstone.gemfire.distributed.internal.membership.gms.GMSUtil.replaceStrings; import static com.gemstone.gemfire.internal.DataSerializableFixedID.JOIN_REQUEST; import static com.gemstone.gemfire.internal.DataSerializableFixedID.JOIN_RESPONSE; +@SuppressWarnings("StatementWithEmptyBody") public class JGroupsMessenger implements Messenger { private static final Logger logger = Services.getLogger(); @@ -79,26 +81,26 @@ public class JGroupsMessenger implements Messenger { private static final String JGROUPS_MCAST_CONFIG_FILE_NAME = "com/gemstone/gemfire/distributed/internal/membership/gms/messenger/jgroups-mcast.xml"; /** JG magic numbers for types added to the JG ClassConfigurator */ - public static final short JGROUPS_TYPE_JGADDRESS = 2000; - public static final short JGROUPS_PROTOCOL_TRANSPORT = 1000; + private static final short JGROUPS_TYPE_JGADDRESS = 2000; + private static final short JGROUPS_PROTOCOL_TRANSPORT = 1000; public static boolean THROW_EXCEPTION_ON_START_HOOK; - String jgStackConfig; + private String jgStackConfig; JChannel myChannel; InternalDistributedMember localAddress; JGAddress jgAddress; - Services services; + private Services services; /** handlers that receive certain classes of messages instead of the Manager */ - Map handlers = new ConcurrentHashMap(); + private final Map handlers = new ConcurrentHashMap<>(); private volatile NetView view; - private GMSPingPonger pingPonger = new GMSPingPonger(); + private final GMSPingPonger pingPonger = new GMSPingPonger(); - protected AtomicLong pongsReceived = new AtomicLong(0); + protected final AtomicLong pongsReceived = new AtomicLong(0); /** * A set that contains addresses that we have logged JGroups IOExceptions for in the @@ -106,7 +108,7 @@ public class JGroupsMessenger implements Messenger { * reduces the amount of suspect processing initiated by IOExceptions and the * amount of exceptions logged */ - private Set
addressesWithIoExceptionsProcessed = Collections.synchronizedSet(new HashSet
()); + private final Set
addressesWithIoExceptionsProcessed = Collections.synchronizedSet(new HashSet
()); static { // register classes that we've added to jgroups that are put on the wire @@ -132,9 +134,9 @@ public class JGroupsMessenger implements Messenger { } System.setProperty("jgroups.resolve_dns", String.valueOf(!b)); - InputStream is= null; + InputStream is; - String r = null; + String r; if (transport.isMcastEnabled()) { r = JGROUPS_MCAST_CONFIG_FILE_NAME; } else { @@ -149,7 +151,7 @@ public class JGroupsMessenger implements Messenger { try { //PlainConfigurator config = PlainConfigurator.getInstance(is); //properties = config.getProtocolStackString(); - StringBuffer sb = new StringBuffer(3000); + StringBuilder sb = new StringBuilder(3000); BufferedReader br; br = new BufferedReader(new InputStreamReader(is, "US-ASCII")); String input; @@ -238,7 +240,7 @@ public class JGroupsMessenger implements Messenger { myChannel = (JChannel)oldChannel; // scrub the old channel ViewId vid = new ViewId(new JGAddress(), 0); - View jgv = new View(vid, new ArrayList
()); + View jgv = new View(vid, new ArrayList<>()); this.myChannel.down(new Event(Event.VIEW_CHANGE, jgv)); UUID logicalAddress = (UUID)myChannel.getAddress(); if (logicalAddress instanceof JGAddress) { @@ -334,12 +336,10 @@ public class JGroupsMessenger implements Messenger { if (this.jgAddress.getVmViewId() < 0) { this.jgAddress.setVmViewId(this.localAddress.getVmViewId()); } - List mbrs = new ArrayList(v.size()); - for (InternalDistributedMember idm: v.getMembers()) { - mbrs.add(new JGAddress(idm)); - } + List mbrs = new ArrayList<>(v.size()); + mbrs.addAll(v.getMembers().stream().map(JGAddress::new).collect(Collectors.toList())); ViewId vid = new ViewId(new JGAddress(v.getCoordinator()), v.getViewId()); - View jgv = new View(vid, new ArrayList
(mbrs)); + View jgv = new View(vid, new ArrayList<>(mbrs)); logger.trace("installing JGroups view: {}", jgv); this.myChannel.down(new Event(Event.VIEW_CHANGE, jgv)); @@ -353,6 +353,7 @@ public class JGroupsMessenger implements Messenger { * recipient.

* see Transport._send() */ + @SuppressWarnings("UnusedParameters") public void handleJGroupsIOException(IOException e, Address dest) { if (services.getManager().shutdownInProgress()) { // GEODE-634 - don't log IOExceptions during shutdown return; @@ -473,6 +474,7 @@ public class JGroupsMessenger implements Messenger { long pongsSnapshot = pongsReceived.longValue(); JGAddress dest = null; try { + //noinspection ConstantConditions pingPonger.sendPingMessage(myChannel, jgAddress, dest); } catch (Exception e) { logger.warn("unable to send multicast message: {}", (jgAddress==null? "multicast recipients":jgAddress), @@ -538,9 +540,7 @@ public class JGroupsMessenger implements Messenger { long now = System.currentTimeMillis(); if (!warned && now >= warnTime) { warned = true; - if (senderSeqnos != null) { - received = String.valueOf(senderSeqnos[0]); - } + received = String.valueOf(senderSeqnos[0]); logger.warn("{} seconds have elapsed while waiting for multicast messages from {}. Received {} but expecting at least {}.", Long.toString((warnTime-startTime)/1000L), sender, received, seqno); } @@ -561,7 +561,7 @@ public class JGroupsMessenger implements Messenger { return send(msg, true); } - public Set send(DistributionMessage msg, boolean reliably) { + private Set send(DistributionMessage msg, boolean reliably) { // perform the same jgroups messaging as in 8.2's GMSMembershipManager.send() method @@ -608,7 +608,7 @@ public class JGroupsMessenger implements Messenger { Message jmsg = createJGMessage(msg, local, Version.CURRENT_ORDINAL); theStats.endMsgSerialization(startSer); - Exception problem = null; + Exception problem; try { jmsg.setTransientFlag(TransientFlag.DONT_LOOPBACK); if (!reliably) { @@ -655,7 +655,7 @@ public class JGroupsMessenger implements Messenger { // Construct the list calculatedLen = v.size(); - calculatedMembers = new LinkedList(); + calculatedMembers = new LinkedList<>(); for (int i = 0; i < calculatedLen; i ++) { InternalDistributedMember m = (InternalDistributedMember)v.get(i); calculatedMembers.add((GMSMember)m.getNetMember()); @@ -663,7 +663,7 @@ public class JGroupsMessenger implements Messenger { } // send to all else { // send to explicit list calculatedLen = len; - calculatedMembers = new LinkedList(); + calculatedMembers = new LinkedList<>(); for (int i = 0; i < calculatedLen; i ++) { calculatedMembers.add((GMSMember)destinations[i].getNetMember()); } @@ -671,10 +671,9 @@ public class JGroupsMessenger implements Messenger { Int2ObjectOpenHashMap messages = new Int2ObjectOpenHashMap<>(); long startSer = theStats.startMsgSerialization(); boolean firstMessage = true; - for (Iterator it=calculatedMembers.iterator(); it.hasNext(); ) { - GMSMember mbr = it.next(); + for (GMSMember mbr : calculatedMembers) { short version = mbr.getVersionOrdinal(); - if ( !messages.containsKey(version) ) { + if (!messages.containsKey(version)) { Message jmsg = createJGMessage(msg, local, version); messages.put(version, jmsg); if (firstMessage) { @@ -689,7 +688,7 @@ public class JGroupsMessenger implements Messenger { for (GMSMember mbr: calculatedMembers) { JGAddress to = new JGAddress(mbr); short version = mbr.getVersionOrdinal(); - Message jmsg = (Message)messages.get(version); + Message jmsg = messages.get(version); Exception problem = null; try { Message tmp = (i < (calculatedLen-1)) ? jmsg.copy(true) : jmsg; @@ -732,11 +731,10 @@ public class JGroupsMessenger implements Messenger { if (msg.forAll()) { return Collections.emptySet(); } - Set result = new HashSet(); + Set result = new HashSet<>(); NetView newView = this.view; if (newView != null && newView != oldView) { - for (int i = 0; i < destinations.length; i ++) { - InternalDistributedMember d = destinations[i]; + for (InternalDistributedMember d : destinations) { if (!newView.contains(d)) { logger.debug("messenger: member has left the view: {} view is now {}", d, newView); result.add(d); @@ -829,7 +827,7 @@ public class JGroupsMessenger implements Messenger { return null; } - InternalDistributedMember sender = null; + InternalDistributedMember sender; Exception problem = null; byte[] buf = jgmsg.getRawBuffer(); @@ -916,12 +914,10 @@ public class JGroupsMessenger implements Messenger { try { Digest digest = new Digest(); digest.readFrom(dis); - if (digest != null) { - logger.trace("installing JGroups message digest {}", digest); - this.myChannel.getProtocolStack() - .getTopProtocol().down(new Event(Event.MERGE_DIGEST, digest)); - jrsp.setMessengerData(null); - } + logger.trace("installing JGroups message digest {}", digest); + this.myChannel.getProtocolStack() + .getTopProtocol().down(new Event(Event.MERGE_DIGEST, digest)); + jrsp.setMessengerData(null); } catch (Exception e) { logger.fatal("Unable to read JGroups messaging digest", e); } @@ -961,12 +957,13 @@ public class JGroupsMessenger implements Messenger { /** * returns the member ID for the given GMSMember object */ + @SuppressWarnings("UnusedParameters") private InternalDistributedMember getMemberFromView(GMSMember jgId, short version) { NetView v = services.getJoinLeave().getView(); if (v != null) { for (InternalDistributedMember m: v.getMembers()) { - if (((GMSMember)m.getNetMember()).equals(jgId)) { + if (m.getNetMember().equals(jgId)) { return m; } } @@ -1090,7 +1087,7 @@ public class JGroupsMessenger implements Messenger { } } if (h == null) { - h = (MessageHandler)services.getManager(); + h = services.getManager(); } return h; }