gossip-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ecapri...@apache.org
Subject incubator-gossip git commit: GOSSIP-39 User Defined Active Gossip (review by Dorian Ellerbe)
Date Fri, 27 Jan 2017 06:36:31 GMT
Repository: incubator-gossip
Updated Branches:
  refs/heads/master b2af44907 -> 3f1882fbc


GOSSIP-39 User Defined Active Gossip (review by Dorian Ellerbe)


Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/3f1882fb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/3f1882fb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/3f1882fb

Branch: refs/heads/master
Commit: 3f1882fbcf805f1b400411a2e1bd936784788f8c
Parents: b2af449
Author: Edward Capriolo <edlinuxguru@gmail.com>
Authored: Sun Jan 22 21:32:18 2017 -0500
Committer: Edward Capriolo <edlinuxguru@gmail.com>
Committed: Fri Jan 27 01:34:08 2017 -0500

----------------------------------------------------------------------
 .../java/org/apache/gossip/GossipMember.java    |  15 +-
 .../java/org/apache/gossip/GossipRunner.java    |  57 -----
 .../java/org/apache/gossip/GossipService.java   |   7 +-
 .../java/org/apache/gossip/GossipSettings.java  |  23 ++
 .../org/apache/gossip/LocalGossipMember.java    |   5 +-
 .../org/apache/gossip/RemoteGossipMember.java   |   9 +-
 .../java/org/apache/gossip/StartupSettings.java |  12 +-
 .../apache/gossip/accrual/FailureDetector.java  |   9 +-
 .../apache/gossip/examples/GossipExample.java   |   5 +-
 .../apache/gossip/examples/StandAloneNode.java  |   5 +-
 .../gossip/manager/AbstractActiveGossiper.java  | 141 +++++++++++
 .../gossip/manager/ActiveGossipThread.java      | 218 -----------------
 .../DatacenterRackAwareActiveGossiper.java      | 233 +++++++++++++++++++
 .../org/apache/gossip/manager/GossipCore.java   |   5 +-
 .../apache/gossip/manager/GossipManager.java    |  23 +-
 .../gossip/manager/SimpleActiveGossipper.java   | 111 +++++++++
 .../manager/random/RandomGossipManager.java     |  19 +-
 .../org/apache/gossip/model/GossipMember.java   |  17 ++
 .../gossip/udp/UdpActiveGossipMessage.java      |   5 +-
 src/test/java/org/apache/gossip/DataTest.java   |  43 ++--
 .../org/apache/gossip/GossipMemberTest.java     |   5 +-
 .../org/apache/gossip/IdAndPropertyTest.java    |  84 +++++++
 .../org/apache/gossip/ShutdownDeadtimeTest.java |   7 +-
 .../org/apache/gossip/StartupSettingsTest.java  |   6 +-
 .../org/apache/gossip/TenNodeThreeSeedTest.java |   3 +-
 .../gossip/accrual/FailureDetectorTest.java     |   6 +-
 .../manager/RandomGossipManagerBuilderTest.java |  20 +-
 27 files changed, 740 insertions(+), 353 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3f1882fb/src/main/java/org/apache/gossip/GossipMember.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/GossipMember.java b/src/main/java/org/apache/gossip/GossipMember.java
index a2f416e..f2834bd 100644
--- a/src/main/java/org/apache/gossip/GossipMember.java
+++ b/src/main/java/org/apache/gossip/GossipMember.java
@@ -19,6 +19,7 @@ package org.apache.gossip;
 
 import java.net.InetSocketAddress;
 import java.net.URI;
+import java.util.Map;
 
 /**
  * A abstract class representing a gossip member.
@@ -40,6 +41,9 @@ public abstract class GossipMember implements Comparable<GossipMember> {
    */
   protected String id;
 
+  /* properties provided at startup time */
+  protected Map<String,String> properties;
+  
   /**
    * Constructor.
    *
@@ -52,11 +56,12 @@ public abstract class GossipMember implements Comparable<GossipMember> {
    * @param id
    *          An id that may be replaced after contact
    */
-  public GossipMember(String clusterName, URI uri, String id, long heartbeat) {
+  public GossipMember(String clusterName, URI uri, String id, long heartbeat, Map<String,String> properties) {
     this.clusterName = clusterName;
     this.id = id;
     this.heartbeat = heartbeat;
     this.uri = uri;
+    this.properties = properties;
   }
 
   /**
@@ -104,6 +109,14 @@ public abstract class GossipMember implements Comparable<GossipMember> {
     this.id = _id;
   }
 
+  public Map<String, String> getProperties() {
+    return properties;
+  }
+
+  public void setProperties(Map<String, String> properties) {
+    this.properties = properties;
+  }
+
   public String toString() {
     return "Member [address=" + getAddress() + ", id=" + id + ", heartbeat=" + heartbeat + "]";
   }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3f1882fb/src/main/java/org/apache/gossip/GossipRunner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/GossipRunner.java b/src/main/java/org/apache/gossip/GossipRunner.java
deleted file mode 100644
index c8a1f13..0000000
--- a/src/main/java/org/apache/gossip/GossipRunner.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gossip;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URISyntaxException;
-
-public class GossipRunner {
-
-  public static void main(String[] args) throws URISyntaxException {
-    File configFile;
-    if (args.length == 1) {
-      configFile = new File("./" + args[0]);
-    } else {
-      configFile = new File("gossip.conf");
-    }
-    new GossipRunner(configFile);
-  }
-
-  public GossipRunner(File configFile) throws URISyntaxException {
-    if (configFile != null && configFile.exists()) {
-      try {
-        System.out.println("Parsing the configuration file...");
-        StartupSettings _settings = StartupSettings.fromJSONFile(configFile);
-        GossipService gossipService = new GossipService(_settings);
-        System.out.println("Gossip service successfully initialized, let's start it...");
-        gossipService.start();
-      } catch (FileNotFoundException e) {
-        System.err.println("The given file is not found!");
-      } catch (IOException e) {
-        System.err.println("Could not read the configuration file: " + e.getMessage());
-      } catch (InterruptedException e) {
-        System.err.println("Error while starting the gossip service: " + e.getMessage());
-      }
-    } else {
-      System.out
-              .println("The gossip.conf file is not found.\n\nEither specify the path to the startup settings file or place the gossip.json file in the same folder as the JAR file.");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3f1882fb/src/main/java/org/apache/gossip/GossipService.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/GossipService.java b/src/main/java/org/apache/gossip/GossipService.java
index fca9f28..f32eb35 100644
--- a/src/main/java/org/apache/gossip/GossipService.java
+++ b/src/main/java/org/apache/gossip/GossipService.java
@@ -20,7 +20,9 @@ package org.apache.gossip;
 import com.codahale.metrics.MetricRegistry;
 import java.net.URI;
 import java.net.UnknownHostException;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import com.codahale.metrics.JmxReporter;
 import org.apache.gossip.event.GossipListener;
@@ -50,7 +52,7 @@ public class GossipService {
   public GossipService(StartupSettings startupSettings) throws InterruptedException,
           UnknownHostException {
     this(startupSettings.getCluster(), startupSettings.getUri()
-            , startupSettings.getId(), startupSettings.getGossipMembers(),
+            , startupSettings.getId(), new HashMap<String,String> (),startupSettings.getGossipMembers(),
             startupSettings.getGossipSettings(), null, new MetricRegistry());
   }
 
@@ -60,7 +62,7 @@ public class GossipService {
    * @throws InterruptedException
    * @throws UnknownHostException
    */
-  public GossipService(String cluster, URI uri, String id,
+  public GossipService(String cluster, URI uri, String id, Map<String,String> properties,
           List<GossipMember> gossipMembers, GossipSettings settings, GossipListener listener, MetricRegistry registry)
           throws InterruptedException, UnknownHostException {
     jmxReporter = JmxReporter.forRegistry(registry).build();
@@ -73,6 +75,7 @@ public class GossipService {
         .gossipMembers(gossipMembers)
         .listener(listener)
         .registry(registry)
+        .properties(properties)
         .build();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3f1882fb/src/main/java/org/apache/gossip/GossipSettings.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/GossipSettings.java b/src/main/java/org/apache/gossip/GossipSettings.java
index 36fabb6..1fed914 100644
--- a/src/main/java/org/apache/gossip/GossipSettings.java
+++ b/src/main/java/org/apache/gossip/GossipSettings.java
@@ -17,6 +17,9 @@
  */
 package org.apache.gossip;
 
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * In this object the settings used by the GossipService are held.
  * 
@@ -41,6 +44,10 @@ public class GossipSettings {
   
   private String distribution = "exponential";
   
+  private String activeGossipClass = "org.apache.gossip.manager.SimpleActiveGossipper";
+  
+  private Map<String,String> activeGossipProperties = new HashMap<>();
+  
   /**
    * Construct GossipSettings with default settings.
    */
@@ -139,5 +146,21 @@ public class GossipSettings {
   public void setDistribution(String distribution) {
     this.distribution = distribution;
   }
+
+  public String getActiveGossipClass() {
+    return activeGossipClass;
+  }
+
+  public void setActiveGossipClass(String activeGossipClass) {
+    this.activeGossipClass = activeGossipClass;
+  }
+
+  public Map<String, String> getActiveGossipProperties() {
+    return activeGossipProperties;
+  }
+
+  public void setActiveGossipProperties(Map<String, String> activeGossipProperties) {
+    this.activeGossipProperties = activeGossipProperties;
+  }
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3f1882fb/src/main/java/org/apache/gossip/LocalGossipMember.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/LocalGossipMember.java b/src/main/java/org/apache/gossip/LocalGossipMember.java
index 83a13df..557ffcb 100644
--- a/src/main/java/org/apache/gossip/LocalGossipMember.java
+++ b/src/main/java/org/apache/gossip/LocalGossipMember.java
@@ -18,6 +18,7 @@
 package org.apache.gossip;
 
 import java.net.URI;
+import java.util.Map;
 
 import org.apache.gossip.accrual.FailureDetector;
 
@@ -40,8 +41,8 @@ public class LocalGossipMember extends GossipMember {
    *          The current heartbeat
    */
   public LocalGossipMember(String clusterName, URI uri, String id,
-          long heartbeat, int windowSize, int minSamples, String distribution) {
-    super(clusterName, uri, id, heartbeat );
+          long heartbeat, Map<String,String> properties, int windowSize, int minSamples, String distribution) {
+    super(clusterName, uri, id, heartbeat, properties );
     detector = new FailureDetector(this, minSamples, windowSize, distribution);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3f1882fb/src/main/java/org/apache/gossip/RemoteGossipMember.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/RemoteGossipMember.java b/src/main/java/org/apache/gossip/RemoteGossipMember.java
index a9e6a76..e3f6620 100644
--- a/src/main/java/org/apache/gossip/RemoteGossipMember.java
+++ b/src/main/java/org/apache/gossip/RemoteGossipMember.java
@@ -18,12 +18,13 @@
 package org.apache.gossip;
 
 import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * The object represents a gossip member with the properties as received from a remote gossip
  * member.
  * 
- * @author harmenw
  */
 public class RemoteGossipMember extends GossipMember {
 
@@ -35,12 +36,12 @@ public class RemoteGossipMember extends GossipMember {
    * @param heartbeat
    *          The current heartbeat
    */
-  public RemoteGossipMember(String clusterName, URI uri, String id, long heartbeat) {
-    super(clusterName, uri, id, heartbeat);
+  public RemoteGossipMember(String clusterName, URI uri, String id, long heartbeat, Map<String,String> properties) {
+    super(clusterName, uri, id, heartbeat, properties);
   }
 
   public RemoteGossipMember(String clusterName, URI uri, String id) {
-    super(clusterName, uri, id, System.currentTimeMillis());
+    super(clusterName, uri, id, System.nanoTime(), new HashMap<String,String>());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3f1882fb/src/main/java/org/apache/gossip/StartupSettings.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/StartupSettings.java b/src/main/java/org/apache/gossip/StartupSettings.java
index de63c66..0117be7 100644
--- a/src/main/java/org/apache/gossip/StartupSettings.java
+++ b/src/main/java/org/apache/gossip/StartupSettings.java
@@ -23,8 +23,11 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 
 import org.apache.log4j.Logger;
 
@@ -164,6 +167,13 @@ public class StartupSettings {
     JsonNode jsonObject = root.get(0);
     String uri = jsonObject.get("uri").textValue();
     String id = jsonObject.get("id").textValue();
+    Map<String,String> properties = new HashMap<String,String>();
+    JsonNode n = jsonObject.get("properties");
+    Iterator<Entry<String, JsonNode>> l = n.fields();
+    while (l.hasNext()){
+      Entry<String, JsonNode> i = l.next();
+      properties.put(i.getKey(), i.getValue().asText());
+    }
     //TODO constants as defaults?
     int gossipInterval = jsonObject.get("gossip_interval").intValue();
     int cleanupInterval = jsonObject.get("cleanup_interval").intValue();
@@ -186,7 +196,7 @@ public class StartupSettings {
       JsonNode child = it.next();
       URI uri3 = new URI(child.get("uri").textValue());
       RemoteGossipMember member = new RemoteGossipMember(child.get("cluster").asText(),
-              uri3, "", 0);
+              uri3, "", 0, new HashMap<String,String>());
       settings.addGossipMember(member);
       configMembersDetails += member.getAddress();
       configMembersDetails += ", ";

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3f1882fb/src/main/java/org/apache/gossip/accrual/FailureDetector.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/accrual/FailureDetector.java b/src/main/java/org/apache/gossip/accrual/FailureDetector.java
index b6a12b6..10d66a9 100644
--- a/src/main/java/org/apache/gossip/accrual/FailureDetector.java
+++ b/src/main/java/org/apache/gossip/accrual/FailureDetector.java
@@ -48,6 +48,9 @@ public class FailureDetector {
   public void recordHeartbeat(long now){
     if (now < latestHeartbeatMs)
       return;
+    if (now - latestHeartbeatMs == 0){
+      return;
+    }
     synchronized (descriptiveStatistics) {
       if (latestHeartbeatMs != -1){
         descriptiveStatistics.addValue(now - latestHeartbeatMs);
@@ -77,7 +80,11 @@ public class FailureDetector {
         }
         return -1.0d * Math.log10(probability);
       } catch (MathException | IllegalArgumentException e) {
-        e.printStackTrace();
+        StringBuilder sb = new StringBuilder();
+        for ( double d: descriptiveStatistics.getSortedValues()){
+          sb.append(d + " ");
+        }
+        LOGGER.debug(e.getMessage() +" "+ sb.toString() +" "+ descriptiveStatistics);
         throw new IllegalArgumentException(e);
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3f1882fb/src/main/java/org/apache/gossip/examples/GossipExample.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/examples/GossipExample.java b/src/main/java/org/apache/gossip/examples/GossipExample.java
index 01cd3e3..8236d46 100644
--- a/src/main/java/org/apache/gossip/examples/GossipExample.java
+++ b/src/main/java/org/apache/gossip/examples/GossipExample.java
@@ -23,6 +23,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 
 import org.apache.gossip.GossipMember;
@@ -74,14 +75,14 @@ public class GossipExample extends Thread {
         } catch (URISyntaxException e) {
           throw new RuntimeException(e);
         }
-        startupMembers.add(new RemoteGossipMember(cluster, u, "", 0 ));
+        startupMembers.add(new RemoteGossipMember(cluster, u, "", 0, new HashMap<String,String>()));
       }
 
       // Lets start the gossip clients.
       // Start the clients, waiting cleaning-interval + 1 second between them which will show the
       // dead list handling.
       for (GossipMember member : startupMembers) {
-        GossipService gossipService = new GossipService(cluster,  member.getUri(), "",
+        GossipService gossipService = new GossipService(cluster,  member.getUri(), "", new HashMap<String,String>(),
                 startupMembers, settings, null, new MetricRegistry());
         clients.add(gossipService);
         gossipService.start();

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3f1882fb/src/main/java/org/apache/gossip/examples/StandAloneNode.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/examples/StandAloneNode.java b/src/main/java/org/apache/gossip/examples/StandAloneNode.java
index d24c0fa..3564943 100644
--- a/src/main/java/org/apache/gossip/examples/StandAloneNode.java
+++ b/src/main/java/org/apache/gossip/examples/StandAloneNode.java
@@ -21,6 +21,7 @@ import com.codahale.metrics.MetricRegistry;
 import java.net.URI;
 import java.net.UnknownHostException;
 import java.util.Arrays;
+import java.util.HashMap;
 
 import org.apache.gossip.GossipService;
 import org.apache.gossip.GossipSettings;
@@ -31,8 +32,8 @@ public class StandAloneNode {
     GossipSettings s = new GossipSettings();
     s.setWindowSize(10);
     s.setConvictThreshold(1.0);
-    s.setGossipInterval(1000);
-    GossipService gossipService = new GossipService("mycluster",  URI.create(args[0]), args[1],
+    s.setGossipInterval(10);
+    GossipService gossipService = new GossipService("mycluster",  URI.create(args[0]), args[1], new HashMap<String, String>(),
             Arrays.asList( new RemoteGossipMember("mycluster", URI.create(args[2]), args[3])), s, (a,b) -> {}, new MetricRegistry());
     gossipService.start();
     while (true){

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3f1882fb/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java b/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java
new file mode 100644
index 0000000..d58aeb9
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager;
+
+import java.util.Map.Entry;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.gossip.LocalGossipMember;
+import org.apache.gossip.model.ActiveGossipOk;
+import org.apache.gossip.model.GossipDataMessage;
+import org.apache.gossip.model.GossipMember;
+import org.apache.gossip.model.Response;
+import org.apache.gossip.model.SharedGossipDataMessage;
+import org.apache.gossip.udp.UdpActiveGossipMessage;
+import org.apache.gossip.udp.UdpGossipDataMessage;
+import org.apache.gossip.udp.UdpSharedGossipDataMessage;
+import org.apache.log4j.Logger;
+
+import static com.codahale.metrics.MetricRegistry.name;
+
+/**
+ * The ActiveGossipThread is sends information. Pick a random partner and send the membership list to that partner
+ */
+public abstract class AbstractActiveGossiper {
+
+  protected static final Logger LOGGER = Logger.getLogger(AbstractActiveGossiper.class);
+  
+  protected final GossipManager gossipManager;
+  protected final GossipCore gossipCore;
+  private final Histogram sharedDataHistogram;
+  private final Histogram sendPerNodeDataHistogram;
+  private final Histogram sendMembershipHistorgram;
+
+  public AbstractActiveGossiper(GossipManager gossipManager, GossipCore gossipCore, MetricRegistry registry) {
+    this.gossipManager = gossipManager;
+    this.gossipCore = gossipCore;
+    sharedDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sharedDataHistogram-time"));
+    sendPerNodeDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sendPerNodeDataHistogram-time"));
+    sendMembershipHistorgram = registry.histogram(name(AbstractActiveGossiper.class, "sendMembershipHistorgram-time"));
+  }
+
+  public void init() {
+
+  }
+  
+  public void shutdown() {
+
+  }
+
+  public final void sendSharedData(LocalGossipMember me, LocalGossipMember member){
+    if (member == null){
+      return;
+    }
+    long startTime = System.currentTimeMillis();
+    for (Entry<String, SharedGossipDataMessage> innerEntry : gossipCore.getSharedData().entrySet()){
+      UdpSharedGossipDataMessage message = new UdpSharedGossipDataMessage();
+      message.setUuid(UUID.randomUUID().toString());
+      message.setUriFrom(me.getId());
+      message.setExpireAt(innerEntry.getValue().getExpireAt());
+      message.setKey(innerEntry.getValue().getKey());
+      message.setNodeId(innerEntry.getValue().getNodeId());
+      message.setTimestamp(innerEntry.getValue().getTimestamp());
+      message.setPayload(innerEntry.getValue().getPayload());
+      gossipCore.sendOneWay(message, member.getUri());
+    }
+    sharedDataHistogram.update(System.currentTimeMillis() - startTime);
+  }
+  
+  public final void sendPerNodeData(LocalGossipMember me, LocalGossipMember member){
+    if (member == null){
+      return;
+    }
+    long startTime = System.currentTimeMillis();
+    for (Entry<String, ConcurrentHashMap<String, GossipDataMessage>> entry : gossipCore.getPerNodeData().entrySet()){
+      for (Entry<String, GossipDataMessage> innerEntry : entry.getValue().entrySet()){
+        UdpGossipDataMessage message = new UdpGossipDataMessage();
+        message.setUuid(UUID.randomUUID().toString());
+        message.setUriFrom(me.getId());
+        message.setExpireAt(innerEntry.getValue().getExpireAt());
+        message.setKey(innerEntry.getValue().getKey());
+        message.setNodeId(innerEntry.getValue().getNodeId());
+        message.setTimestamp(innerEntry.getValue().getTimestamp());
+        message.setPayload(innerEntry.getValue().getPayload());
+        gossipCore.sendOneWay(message, member.getUri());   
+      }
+    }
+    sendPerNodeDataHistogram.update(System.currentTimeMillis() - startTime);
+  }
+    
+  /**
+   * Performs the sending of the membership list, after we have incremented our own heartbeat.
+   */
+  protected void sendMembershipList(LocalGossipMember me, LocalGossipMember member) {
+    if (member == null){
+      return;
+    }
+    long startTime = System.currentTimeMillis();
+    me.setHeartbeat(System.nanoTime());
+    UdpActiveGossipMessage message = new UdpActiveGossipMessage();
+    message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString());
+    message.setUuid(UUID.randomUUID().toString());
+    message.getMembers().add(convert(me));
+    for (LocalGossipMember other : gossipManager.getMembers().keySet()) {
+      message.getMembers().add(convert(other));
+    }
+    Response r = gossipCore.send(message, member.getUri());
+    if (r instanceof ActiveGossipOk){
+      //maybe count metrics here
+    } else {
+      LOGGER.debug("Message " + message + " generated response " + r);
+    }
+    sendMembershipHistorgram.update(System.currentTimeMillis() - startTime);
+  }
+    
+  protected final GossipMember convert(LocalGossipMember member){
+    GossipMember gm = new GossipMember();
+    gm.setCluster(member.getClusterName());
+    gm.setHeartbeat(member.getHeartbeat());
+    gm.setUri(member.getUri().toASCIIString());
+    gm.setId(member.getId());
+    gm.setProperties(member.getProperties());
+    return gm;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3f1882fb/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
deleted file mode 100644
index f81565b..0000000
--- a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gossip.manager;
-
-import java.util.List;
-
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import com.codahale.metrics.Histogram;
-import com.codahale.metrics.MetricRegistry;
-import org.apache.gossip.LocalGossipMember;
-import org.apache.gossip.model.ActiveGossipOk;
-import org.apache.gossip.model.GossipDataMessage;
-import org.apache.gossip.model.GossipMember;
-import org.apache.gossip.model.Response;
-import org.apache.gossip.model.SharedGossipDataMessage;
-import org.apache.gossip.udp.UdpActiveGossipMessage;
-import org.apache.gossip.udp.UdpGossipDataMessage;
-import org.apache.gossip.udp.UdpSharedGossipDataMessage;
-import org.apache.log4j.Logger;
-
-import static com.codahale.metrics.MetricRegistry.name;
-
-/**
- * The ActiveGossipThread is sends information. Pick a random partner and send the membership list to that partner
- */
-public class ActiveGossipThread {
-
-  private static final Logger LOGGER = Logger.getLogger(ActiveGossipThread.class);
-  
-  private final GossipManager gossipManager;
-  private final Random random;
-  private final GossipCore gossipCore;
-  private ScheduledExecutorService scheduledExecutorService;
-  private final BlockingQueue<Runnable> workQueue;
-  private ThreadPoolExecutor threadService;
-
-  private final Histogram sharedDataHistogram;
-  private final Histogram sendPerNodeDataHistogram;
-  private final Histogram sendMembershipHistorgram;
-
-  public ActiveGossipThread(GossipManager gossipManager, GossipCore gossipCore, MetricRegistry registry) {
-    this.gossipManager = gossipManager;
-    random = new Random();
-    this.gossipCore = gossipCore;
-    scheduledExecutorService = Executors.newScheduledThreadPool(2);
-    workQueue = new ArrayBlockingQueue<Runnable>(1024);
-    threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, workQueue, new ThreadPoolExecutor.DiscardOldestPolicy());
-    sharedDataHistogram = registry.histogram(name(ActiveGossipThread.class, "sharedDataHistogram-time"));
-    sendPerNodeDataHistogram = registry.histogram(name(ActiveGossipThread.class, "sendPerNodeDataHistogram-time"));
-    sendMembershipHistorgram = registry.histogram(name(ActiveGossipThread.class, "sendMembershipHistorgram-time"));
-  }
-
-
-  public void init() {
-    scheduledExecutorService.scheduleAtFixedRate(
-            () -> { 
-              threadService.execute( () -> { sendToALiveMember(); });
-            }, 0,
-            gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
-    scheduledExecutorService.scheduleAtFixedRate(
-            () -> { this.sendToDeadMember(); }, 0,
-            gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
-    scheduledExecutorService.scheduleAtFixedRate(
-            () -> sendPerNodeData(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0,
-            gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
-    scheduledExecutorService.scheduleAtFixedRate(
-            () -> sendSharedData(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0,
-            gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
-  }
-  
-  public void shutdown() {
-    scheduledExecutorService.shutdown();
-    try {
-      scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
-    } catch (InterruptedException e) {
-      LOGGER.debug("Issue during shurdown" + e);
-    }
-  }
-
-  public void sendSharedData(LocalGossipMember me, List<LocalGossipMember> memberList){
-    long startTime = System.currentTimeMillis();
-
-    LocalGossipMember member = selectPartner(memberList);
-    if (member == null) {
-      LOGGER.debug("Send sendMembershipList() is called without action");
-      sharedDataHistogram.update(System.currentTimeMillis() - startTime);
-      return;
-    }    
-    for (Entry<String, SharedGossipDataMessage> innerEntry : gossipCore.getSharedData().entrySet()){
-        UdpSharedGossipDataMessage message = new UdpSharedGossipDataMessage();
-        message.setUuid(UUID.randomUUID().toString());
-        message.setUriFrom(me.getId());
-        message.setExpireAt(innerEntry.getValue().getExpireAt());
-        message.setKey(innerEntry.getValue().getKey());
-        message.setNodeId(innerEntry.getValue().getNodeId());
-        message.setTimestamp(innerEntry.getValue().getTimestamp());
-        message.setPayload(innerEntry.getValue().getPayload());
-        gossipCore.sendOneWay(message, member.getUri());
-    }
-    sharedDataHistogram.update(System.currentTimeMillis() - startTime);
-  }
-  
-  public void sendPerNodeData(LocalGossipMember me, List<LocalGossipMember> memberList){
-    long startTime = System.currentTimeMillis();
-
-    LocalGossipMember member = selectPartner(memberList);
-    if (member == null) {
-      LOGGER.debug("Send sendMembershipList() is called without action");
-      sendPerNodeDataHistogram.update(System.currentTimeMillis() - startTime);
-      return;
-    }    
-    for (Entry<String, ConcurrentHashMap<String, GossipDataMessage>> entry : gossipCore.getPerNodeData().entrySet()){
-      for (Entry<String, GossipDataMessage> innerEntry : entry.getValue().entrySet()){
-        UdpGossipDataMessage message = new UdpGossipDataMessage();
-        message.setUuid(UUID.randomUUID().toString());
-        message.setUriFrom(me.getId());
-        message.setExpireAt(innerEntry.getValue().getExpireAt());
-        message.setKey(innerEntry.getValue().getKey());
-        message.setNodeId(innerEntry.getValue().getNodeId());
-        message.setTimestamp(innerEntry.getValue().getTimestamp());
-        message.setPayload(innerEntry.getValue().getPayload());
-        gossipCore.sendOneWay(message, member.getUri());   
-      }
-    }
-    sendPerNodeDataHistogram.update(System.currentTimeMillis() - startTime);
-  }
-  
-  protected void sendToALiveMember(){
-    LocalGossipMember member = selectPartner(gossipManager.getLiveMembers());
-    System.out.println("send" );
-    sendMembershipList(gossipManager.getMyself(), member);
-  }
-  
-  protected void sendToDeadMember(){
-    LocalGossipMember member = selectPartner(gossipManager.getDeadMembers());
-    sendMembershipList(gossipManager.getMyself(), member);
-  }
-  
-  /**
-   * Performs the sending of the membership list, after we have incremented our own heartbeat.
-   */
-  protected void sendMembershipList(LocalGossipMember me, LocalGossipMember member) {
-    long startTime = System.currentTimeMillis();
-    me.setHeartbeat(System.nanoTime());
-    if (member == null) {
-      LOGGER.debug("Send sendMembershipList() is called without action");
-      sendMembershipHistorgram.update(System.currentTimeMillis() - startTime);
-      return;
-    } else {
-      LOGGER.debug("Send sendMembershipList() is called to " + member.toString());
-    }
-    UdpActiveGossipMessage message = new UdpActiveGossipMessage();
-    message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString());
-    message.setUuid(UUID.randomUUID().toString());
-    message.getMembers().add(convert(me));
-    for (LocalGossipMember other : gossipManager.getMembers().keySet()) {
-      message.getMembers().add(convert(other));
-    }
-    Response r = gossipCore.send(message, member.getUri());
-    if (r instanceof ActiveGossipOk){
-      //maybe count metrics here
-    } else {
-      LOGGER.debug("Message " + message + " generated response " + r);
-    }
-    sendMembershipHistorgram.update(System.currentTimeMillis() - startTime);
-  }
-  
-  /**
-   * 
-   * @param memberList
-   *          The list of members which are stored in the local list of members.
-   * @return The chosen LocalGossipMember to gossip with.
-   */
-  protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) {
-    LocalGossipMember member = null;
-    if (memberList.size() > 0) {
-      int randomNeighborIndex = random.nextInt(memberList.size());
-      member = memberList.get(randomNeighborIndex);
-    } else {
-      LOGGER.debug("I am alone in this world.");
-    }
-    return member;
-  }
-  
-  private GossipMember convert(LocalGossipMember member){
-    GossipMember gm = new GossipMember();
-    gm.setCluster(member.getClusterName());
-    gm.setHeartbeat(member.getHeartbeat());
-    gm.setUri(member.getUri().toASCIIString());
-    gm.setId(member.getId());
-    return gm;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3f1882fb/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java b/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java
new file mode 100644
index 0000000..40b9c28
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager;
+
+import java.util.List;
+import java.util.Random;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.gossip.LocalGossipMember;
+
+import com.codahale.metrics.MetricRegistry;
+
+/**
+ * Sends gossip traffic at different rates to other racks and data-centers.
+ * This implementation controls the rate at which gossip traffic is shared. 
+ * There are two constructs Datacenter and Rack. It is assumed that bandwidth and latency is higher
+ * in the rack than in the the datacenter. We can adjust the rate at which we send messages to each group.
+ * 
+ */
+public class DatacenterRackAwareActiveGossiper extends AbstractActiveGossiper {
+
+  public static final String DATACENTER = "datacenter";
+  public static final String RACK = "rack";
+  
+  private int sameRackGossipIntervalMs = 100;
+  private int sameDcGossipIntervalMs = 500;
+  private int differentDatacenterGossipIntervalMs = 1000;
+  private int randomDeadMemberSendIntervalMs = 250;
+  
+  private ScheduledExecutorService scheduledExecutorService;
+  private final BlockingQueue<Runnable> workQueue;
+  private ThreadPoolExecutor threadService;
+  private final Random random;
+  
+  public DatacenterRackAwareActiveGossiper(GossipManager gossipManager, GossipCore gossipCore,
+          MetricRegistry registry) {
+    super(gossipManager, gossipCore, registry);
+    scheduledExecutorService = Executors.newScheduledThreadPool(2);
+    workQueue = new ArrayBlockingQueue<Runnable>(1024);
+    threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, workQueue,
+            new ThreadPoolExecutor.DiscardOldestPolicy());
+    random = new Random();
+    try {
+      sameRackGossipIntervalMs = Integer.parseInt(gossipManager.getSettings()
+              .getActiveGossipProperties().get("sameRackGossipIntervalMs"));
+    } catch (RuntimeException ex) { }
+    try {
+      sameDcGossipIntervalMs = Integer.parseInt(gossipManager.getSettings()
+              .getActiveGossipProperties().get("sameDcGossipIntervalMs"));
+    } catch (RuntimeException ex) { }
+    try {
+      differentDatacenterGossipIntervalMs = Integer.parseInt(gossipManager.getSettings()
+              .getActiveGossipProperties().get("differentDatacenterGossipIntervalMs"));
+    } catch (RuntimeException ex) { }
+    try {
+      randomDeadMemberSendIntervalMs = Integer.parseInt(gossipManager.getSettings()
+              .getActiveGossipProperties().get("randomDeadMemberSendIntervalMs"));
+    } catch (RuntimeException ex) { }
+  }
+
+  @Override
+  public void init() {
+    super.init();
+    //same rack
+    scheduledExecutorService.scheduleAtFixedRate(() -> 
+      threadService.execute(() -> sendToSameRackMember()), 
+      0, sameRackGossipIntervalMs, TimeUnit.MILLISECONDS);
+    
+    scheduledExecutorService.scheduleAtFixedRate(() -> 
+      threadService.execute(() -> sendToSameRackMemberPerNode()), 
+      0, sameRackGossipIntervalMs, TimeUnit.MILLISECONDS);
+    
+    scheduledExecutorService.scheduleAtFixedRate(() -> 
+      threadService.execute(() -> sendToSameRackShared()), 
+      0, sameRackGossipIntervalMs, TimeUnit.MILLISECONDS);
+    
+    //same dc different rack
+    scheduledExecutorService.scheduleAtFixedRate(() -> 
+      threadService.execute(() -> sameDcDiffernetRackMember()), 
+      0, sameDcGossipIntervalMs, TimeUnit.MILLISECONDS);
+    
+    scheduledExecutorService.scheduleAtFixedRate(() -> 
+    threadService.execute(() -> sameDcDiffernetRackPerNode()), 
+    0, sameDcGossipIntervalMs, TimeUnit.MILLISECONDS);
+    
+    scheduledExecutorService.scheduleAtFixedRate(() -> 
+    threadService.execute(() -> sameDcDiffernetRackShared()), 
+    0, sameDcGossipIntervalMs, TimeUnit.MILLISECONDS);
+    
+    //different dc
+    scheduledExecutorService.scheduleAtFixedRate(() -> 
+      threadService.execute(() -> differentDcMember()), 
+      0, differentDatacenterGossipIntervalMs, TimeUnit.MILLISECONDS);
+    
+    scheduledExecutorService.scheduleAtFixedRate(() -> 
+    threadService.execute(() -> differentDcPerNode()), 
+    0, differentDatacenterGossipIntervalMs, TimeUnit.MILLISECONDS);
+  
+    scheduledExecutorService.scheduleAtFixedRate(() -> 
+    threadService.execute(() -> differentDcShared()), 
+    0, differentDatacenterGossipIntervalMs, TimeUnit.MILLISECONDS);
+    
+    //the dead
+    scheduledExecutorService.scheduleAtFixedRate(() -> 
+      threadService.execute(() -> sendToDeadMember()), 
+      0, randomDeadMemberSendIntervalMs, TimeUnit.MILLISECONDS);
+    
+  }
+
+  private void sendToDeadMember() {
+    sendMembershipList(gossipManager.getMyself(), selectPartner(gossipManager.getDeadMembers()));
+  }
+  
+  private List<LocalGossipMember> differentDataCenter(){
+    String myDc = gossipManager.getMyself().getProperties().get(DATACENTER);
+    String rack = gossipManager.getMyself().getProperties().get(RACK);
+    if (myDc == null|| rack == null){
+      return Collections.emptyList();
+    }
+    List<LocalGossipMember> notMyDc = new ArrayList<LocalGossipMember>(10);
+    for (LocalGossipMember i : gossipManager.getLiveMembers()){
+      if (!myDc.equals(i.getProperties().get(DATACENTER))){
+        notMyDc.add(i);
+      }
+    }
+    return notMyDc;
+  }
+  
+  private List<LocalGossipMember> sameDatacenterDifferentRack(){
+    String myDc = gossipManager.getMyself().getProperties().get(DATACENTER);
+    String rack = gossipManager.getMyself().getProperties().get(RACK);
+    if (myDc == null|| rack == null){
+      return Collections.emptyList();
+    }
+    List<LocalGossipMember> notMyDc = new ArrayList<LocalGossipMember>(10);
+    for (LocalGossipMember i : gossipManager.getLiveMembers()){
+      if (myDc.equals(i.getProperties().get(DATACENTER)) && !rack.equals(i.getProperties().get(RACK))){
+        notMyDc.add(i);
+      }
+    }
+    return notMyDc;
+  }
+    
+  private List<LocalGossipMember> sameRackNodes(){
+    String myDc = gossipManager.getMyself().getProperties().get(DATACENTER);
+    String rack = gossipManager.getMyself().getProperties().get(RACK);
+    if (myDc == null|| rack == null){
+      return Collections.emptyList();
+    }
+    List<LocalGossipMember> sameDcAndRack = new ArrayList<LocalGossipMember>(10);
+    for (LocalGossipMember i : gossipManager.getLiveMembers()){
+      if (myDc.equals(i.getProperties().get(DATACENTER))
+              && rack.equals(i.getProperties().get(RACK))){
+        sameDcAndRack.add(i);
+      }
+    }
+    return sameDcAndRack;
+  }
+
+  private void sendToSameRackMember() {
+    sendMembershipList(gossipManager.getMyself(), selectPartner(sameRackNodes()));
+  }
+  
+  private void sendToSameRackMemberPerNode() {
+    sendPerNodeData(gossipManager.getMyself(), selectPartner(sameRackNodes()));
+  }
+  
+  private void sendToSameRackShared() {
+    sendSharedData(gossipManager.getMyself(), selectPartner(sameRackNodes()));
+  }
+  
+  private void differentDcMember() {
+    sendMembershipList(gossipManager.getMyself(), selectPartner(differentDataCenter()));
+  }
+  
+  private void differentDcPerNode() {
+    sendPerNodeData(gossipManager.getMyself(), selectPartner(differentDataCenter()));
+  }
+  
+  private void differentDcShared() {
+    sendSharedData(gossipManager.getMyself(), selectPartner(differentDataCenter()));
+  }
+  
+  private void sameDcDiffernetRackMember() {
+    sendMembershipList(gossipManager.getMyself(), selectPartner(sameDatacenterDifferentRack()));
+  }
+  
+  private void sameDcDiffernetRackPerNode() {
+    sendPerNodeData(gossipManager.getMyself(), selectPartner(sameDatacenterDifferentRack()));
+  }
+  
+  private void sameDcDiffernetRackShared() {
+    sendSharedData(gossipManager.getMyself(), selectPartner(sameDatacenterDifferentRack()));
+  }
+  
+
+  @Override
+  public void shutdown() {
+    super.shutdown();
+  }
+
+  protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) {
+    LocalGossipMember member = null;
+    if (memberList.size() > 0) {
+      int randomNeighborIndex = random.nextInt(memberList.size());
+      member = memberList.get(randomNeighborIndex);
+    }
+    return member;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3f1882fb/src/main/java/org/apache/gossip/manager/GossipCore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java
index 5d561c3..31bd447 100644
--- a/src/main/java/org/apache/gossip/manager/GossipCore.java
+++ b/src/main/java/org/apache/gossip/manager/GossipCore.java
@@ -165,7 +165,8 @@ public class GossipCore implements GossipCoreConstants {
                 activeGossipMessage.getMembers().get(i).getCluster(),
                 u,
                 activeGossipMessage.getMembers().get(i).getId(),
-                activeGossipMessage.getMembers().get(i).getHeartbeat());
+                activeGossipMessage.getMembers().get(i).getHeartbeat(),
+                activeGossipMessage.getMembers().get(i).getProperties());
         if (i == 0) {
           senderMember = member;
         } 
@@ -321,6 +322,7 @@ public class GossipCore implements GossipCoreConstants {
       remoteMember.getUri(), 
       remoteMember.getId(), 
       remoteMember.getHeartbeat(), 
+      remoteMember.getProperties(),
       gossipManager.getSettings().getWindowSize(),
       gossipManager.getSettings().getMinimumSamples(),
       gossipManager.getSettings().getDistribution());
@@ -331,6 +333,7 @@ public class GossipCore implements GossipCoreConstants {
           if (localMember.getKey().getId().equals(remoteMember.getId())){
             localMember.getKey().recordHeartbeat(remoteMember.getHeartbeat());
             localMember.getKey().setHeartbeat(remoteMember.getHeartbeat());
+            localMember.getKey().setProperties(remoteMember.getProperties());
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3f1882fb/src/main/java/org/apache/gossip/manager/GossipManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java
index cf67c9c..840efb9 100644
--- a/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -18,9 +18,13 @@
 package org.apache.gossip.manager;
 
 import com.codahale.metrics.MetricRegistry;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.net.URI;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
@@ -59,7 +63,7 @@ public abstract class GossipManager {
 
   private final GossipListener listener;
 
-  private ActiveGossipThread activeGossipThread;
+  private AbstractActiveGossiper activeGossipThread;
 
   private PassiveGossipThread passiveGossipThread;
 
@@ -76,21 +80,22 @@ public abstract class GossipManager {
   private MetricRegistry registry;
 
   public GossipManager(String cluster,
-          URI uri, String id, GossipSettings settings,
+          URI uri, String id, Map<String,String> properties, GossipSettings settings,
           List<GossipMember> gossipMembers, GossipListener listener, MetricRegistry registry) {
     
     this.settings = settings;
     gossipCore = new GossipCore(this, registry);
     clock = new SystemClock();
     dataReaper = new DataReaper(gossipCore, clock);
-    me = new LocalGossipMember(cluster, uri, id, clock.nanoTime(),
+    me = new LocalGossipMember(cluster, uri, id, clock.nanoTime(), properties,
             settings.getWindowSize(), settings.getMinimumSamples(), settings.getDistribution());
     members = new ConcurrentSkipListMap<>();
     for (GossipMember startupMember : gossipMembers) {
       if (!startupMember.equals(me)) {
         LocalGossipMember member = new LocalGossipMember(startupMember.getClusterName(),
                 startupMember.getUri(), startupMember.getId(),
-                clock.nanoTime(), settings.getWindowSize(), settings.getMinimumSamples(), settings.getDistribution());
+                clock.nanoTime(), startupMember.getProperties(), settings.getWindowSize(), 
+                settings.getMinimumSamples(), settings.getDistribution());
         //TODO should members start in down state?
         members.put(member, GossipState.DOWN);
       }
@@ -137,6 +142,14 @@ public abstract class GossipManager {
     return me;
   }
 
+  private AbstractActiveGossiper constructActiveGossiper(){
+    try {
+      Constructor<?> c = Class.forName(settings.getActiveGossipClass()).getConstructor(GossipManager.class, GossipCore.class, MetricRegistry.class);
+      return (AbstractActiveGossiper) c.newInstance(this, gossipCore, registry);
+    } catch (NoSuchMethodException | SecurityException | ClassNotFoundException | InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
+      throw new RuntimeException(e);
+    }
+  }
   /**
    * Starts the client. Specifically, start the various cycles for this protocol. Start the gossip
    * thread and start the receiver thread.
@@ -144,7 +157,7 @@ public abstract class GossipManager {
   public void init() {
     passiveGossipThread = new OnlyProcessReceivedPassiveGossipThread(this, gossipCore);
     gossipThreadExecutor.execute(passiveGossipThread);
-    activeGossipThread = new ActiveGossipThread(this, this.gossipCore, registry);
+    activeGossipThread = constructActiveGossiper();
     activeGossipThread.init();
     dataReaper.init();
     scheduledServiced.scheduleAtFixedRate(() -> {

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3f1882fb/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java b/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java
new file mode 100644
index 0000000..43237fb
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager;
+
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.gossip.LocalGossipMember;
+
+import com.codahale.metrics.MetricRegistry;
+
+/**
+ * Base implementation gossips randomly to live nodes periodically gossips to dead ones
+ *
+ */
+public class SimpleActiveGossipper extends AbstractActiveGossiper {
+
+  private ScheduledExecutorService scheduledExecutorService;
+  private final BlockingQueue<Runnable> workQueue;
+  private ThreadPoolExecutor threadService;
+  private final Random random;
+  
+  public SimpleActiveGossipper(GossipManager gossipManager, GossipCore gossipCore,
+          MetricRegistry registry) {
+    super(gossipManager, gossipCore, registry);
+    scheduledExecutorService = Executors.newScheduledThreadPool(2);
+    workQueue = new ArrayBlockingQueue<Runnable>(1024);
+    threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, workQueue,
+            new ThreadPoolExecutor.DiscardOldestPolicy());
+    random = new Random();
+  }
+
+  @Override
+  public void init() {
+    super.init();
+    scheduledExecutorService.scheduleAtFixedRate(() -> {
+      threadService.execute(() -> {
+        sendToALiveMember();
+      });
+    }, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
+    scheduledExecutorService.scheduleAtFixedRate(() -> {
+      sendToDeadMember();
+    }, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
+    scheduledExecutorService.scheduleAtFixedRate(
+            () -> sendPerNodeData(gossipManager.getMyself(),
+                    selectPartner(gossipManager.getLiveMembers())),
+            0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
+    scheduledExecutorService.scheduleAtFixedRate(
+            () -> sendSharedData(gossipManager.getMyself(),
+                    selectPartner(gossipManager.getLiveMembers())),
+            0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
+  }
+
+  @Override
+  public void shutdown() {
+    super.shutdown();
+    scheduledExecutorService.shutdown();
+    try {
+      scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      LOGGER.debug("Issue during shutdown", e);
+    }
+  }
+
+  protected void sendToALiveMember(){
+    LocalGossipMember member = selectPartner(gossipManager.getLiveMembers());
+    sendMembershipList(gossipManager.getMyself(), member);
+  }
+  
+  protected void sendToDeadMember(){
+    LocalGossipMember member = selectPartner(gossipManager.getDeadMembers());
+    sendMembershipList(gossipManager.getMyself(), member);
+  }
+  
+  /**
+   * 
+   * @param memberList
+   *          The list of members which are stored in the local list of members.
+   * @return The chosen LocalGossipMember to gossip with.
+   */
+  protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) {
+    //TODO this selection is racey what if the list size changes?
+    LocalGossipMember member = null;
+    if (memberList.size() > 0) {
+      int randomNeighborIndex = random.nextInt(memberList.size());
+      member = memberList.get(randomNeighborIndex);
+    }
+    return member;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3f1882fb/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
index fd936f1..4a150be 100644
--- a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
@@ -25,8 +25,9 @@ import org.apache.gossip.manager.GossipManager;
 
 import java.net.URI;
 import java.util.List;
-
+import java.util.Map;
 import java.util.ArrayList;
+import java.util.HashMap;
 
 public class RandomGossipManager extends GossipManager {
 
@@ -42,6 +43,7 @@ public class RandomGossipManager extends GossipManager {
     private List<GossipMember> gossipMembers;
     private GossipListener listener;
     private MetricRegistry registry;
+    private Map<String,String> properties;
 
     private ManagerBuilder() {}
 
@@ -55,6 +57,11 @@ public class RandomGossipManager extends GossipManager {
       this.cluster = cluster;
       return this;
     }
+    
+    public ManagerBuilder properties(Map<String,String> properties) {
+      this.properties = properties;
+      return this;
+    }
 
     public ManagerBuilder withId(String id) {
       this.id = id;
@@ -75,6 +82,7 @@ public class RandomGossipManager extends GossipManager {
       this.listener = listener;
       return this;
     }
+    
     public ManagerBuilder registry(MetricRegistry registry) {
       this.registry = registry;
       return this;
@@ -91,18 +99,21 @@ public class RandomGossipManager extends GossipManager {
       checkArgument(settings != null, "You must specify gossip settings");
       checkArgument(uri != null, "You must specify a uri");
       checkArgument(registry != null, "You must specify a MetricRegistry");
+      if (properties == null){
+        properties = new HashMap<String,String>();
+      }
       if (listener == null){
         listener((a,b) -> {});
       }
       if (gossipMembers == null) {
         gossipMembers = new ArrayList<>();
       }
-      return new RandomGossipManager(cluster, uri, id, settings, gossipMembers, listener, registry);
+      return new RandomGossipManager(cluster, uri, id, properties, settings, gossipMembers, listener, registry);
     }
   }
 
-  private RandomGossipManager(String cluster, URI uri, String id, GossipSettings settings, 
+  private RandomGossipManager(String cluster, URI uri, String id, Map<String,String> properties,  GossipSettings settings, 
           List<GossipMember> gossipMembers, GossipListener listener, MetricRegistry registry) {
-    super(cluster, uri, id, settings, gossipMembers, listener, registry);
+    super(cluster, uri, id, properties, settings, gossipMembers, listener, registry);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3f1882fb/src/main/java/org/apache/gossip/model/GossipMember.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/model/GossipMember.java b/src/main/java/org/apache/gossip/model/GossipMember.java
index 03ec13d..a318776 100644
--- a/src/main/java/org/apache/gossip/model/GossipMember.java
+++ b/src/main/java/org/apache/gossip/model/GossipMember.java
@@ -17,12 +17,15 @@
  */
 package org.apache.gossip.model;
 
+import java.util.Map;
+
 public class GossipMember {
 
   private String cluster;
   private String uri;
   private String id;
   private Long heartbeat;
+  private Map<String,String> properties;
   
   public GossipMember(){
     
@@ -66,5 +69,19 @@ public class GossipMember {
   public void setHeartbeat(Long heartbeat) {
     this.heartbeat = heartbeat;
   }
+
+  public Map<String, String> getProperties() {
+    return properties;
+  }
+
+  public void setProperties(Map<String, String> properties) {
+    this.properties = properties;
+  }
+
+  @Override
+  public String toString() {
+    return "GossipMember [cluster=" + cluster + ", uri=" + uri + ", id=" + id + ", heartbeat="
+            + heartbeat + ", properties=" + properties + "]";
+  }
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3f1882fb/src/main/java/org/apache/gossip/udp/UdpActiveGossipMessage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/udp/UdpActiveGossipMessage.java b/src/main/java/org/apache/gossip/udp/UdpActiveGossipMessage.java
index 424d1ca..b6e8101 100644
--- a/src/main/java/org/apache/gossip/udp/UdpActiveGossipMessage.java
+++ b/src/main/java/org/apache/gossip/udp/UdpActiveGossipMessage.java
@@ -42,7 +42,8 @@ public class UdpActiveGossipMessage extends ActiveGossipMessage implements Track
 
   @Override
   public String toString() {
-    return "UdpActiveGossipMessage [uriFrom=" + uriFrom + ", uuid=" + uuid + "]";
+    return "UdpActiveGossipMessage [uriFrom=" + uriFrom + ", uuid=" + uuid + ", getMembers()="
+            + getMembers() + "]";
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3f1882fb/src/test/java/org/apache/gossip/DataTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/gossip/DataTest.java b/src/test/java/org/apache/gossip/DataTest.java
index 766d72b..83879f9 100644
--- a/src/test/java/org/apache/gossip/DataTest.java
+++ b/src/test/java/org/apache/gossip/DataTest.java
@@ -22,6 +22,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.Callable;
@@ -50,7 +51,7 @@ public class DataTest {
     for (int i = 1; i < clusterMembers+1; ++i) {
       URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
       GossipService gossipService = new GossipService(cluster, uri, i + "",
-              startupMembers, settings,
+              new HashMap<String,String>(), startupMembers, settings,
               (a,b) -> {}, new MetricRegistry());
       clients.add(gossipService);
       gossipService.start();
@@ -65,29 +66,27 @@ public class DataTest {
       }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2);
     clients.get(0).gossipPerNodeData(msg());
     clients.get(0).gossipSharedData(sharedMsg());
-    Thread.sleep(10000);
-    TUnit.assertThat(
-            new Callable<Object>() {
-              public Object call() throws Exception {
-                GossipDataMessage x = clients.get(1).findPerNodeData(1 + "", "a");
-                if (x == null)
-                  return "";
-                else
-                  return x.getPayload();
-              }
-            }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("b");
+
+    TUnit.assertThat(new Callable<Object>() {
+      public Object call() throws Exception {
+        GossipDataMessage x = clients.get(1).findPerNodeData(1 + "", "a");
+        if (x == null)
+          return "";
+        else
+          return x.getPayload();
+      }
+    }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("b");
     
     
-    TUnit.assertThat(
-            new Callable<Object>() {
-              public Object call() throws Exception {
-                SharedGossipDataMessage x = clients.get(1).findSharedData("a");
-                if (x == null)
-                  return "";
-                else
-                  return x.getPayload();
-              }
-            }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("c");
+    TUnit.assertThat(new Callable<Object>() {
+      public Object call() throws Exception {
+        SharedGossipDataMessage x = clients.get(1).findSharedData("a");
+        if (x == null)
+          return "";
+        else
+          return x.getPayload();
+      }
+    }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("c");
     
     
     for (int i = 0; i < clusterMembers; ++i) {

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3f1882fb/src/test/java/org/apache/gossip/GossipMemberTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/gossip/GossipMemberTest.java b/src/test/java/org/apache/gossip/GossipMemberTest.java
index e15259e..272c7fb 100644
--- a/src/test/java/org/apache/gossip/GossipMemberTest.java
+++ b/src/test/java/org/apache/gossip/GossipMemberTest.java
@@ -19,6 +19,7 @@ package org.apache.gossip;
 
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.HashMap;
 
 import org.junit.Assert;
 import org.junit.jupiter.api.Test;
@@ -31,9 +32,9 @@ public class GossipMemberTest {
   @Test
   public void testHashCodeFromGossip40() throws URISyntaxException {
     Assert.assertNotEquals(
-            new LocalGossipMember("mycluster", new URI("udp://4.4.4.4:1000"), "myid", 1, 10, 5, "exponential")
+            new LocalGossipMember("mycluster", new URI("udp://4.4.4.4:1000"), "myid", 1, new HashMap<String,String>(), 10, 5, "exponential")
                     .hashCode(),
-            new LocalGossipMember("mycluster", new URI("udp://4.4.4.5:1005"), "yourid", 11, 11, 6, "exponential")
+            new LocalGossipMember("mycluster", new URI("udp://4.4.4.5:1005"), "yourid", 11, new HashMap<String,String>(), 11, 6, "exponential")
                     .hashCode());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3f1882fb/src/test/java/org/apache/gossip/IdAndPropertyTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/gossip/IdAndPropertyTest.java b/src/test/java/org/apache/gossip/IdAndPropertyTest.java
new file mode 100644
index 0000000..2a98f01
--- /dev/null
+++ b/src/test/java/org/apache/gossip/IdAndPropertyTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.gossip.manager.DatacenterRackAwareActiveGossiper;
+import org.junit.jupiter.api.Test;
+import org.junit.platform.runner.JUnitPlatform;
+import org.junit.runner.RunWith;
+
+import com.codahale.metrics.MetricRegistry;
+
+import io.teknek.tunit.TUnit;
+
+@RunWith(JUnitPlatform.class)
+public class IdAndPropertyTest {
+
+  @Test
+  public void testDatacenterRackGossiper() throws URISyntaxException, UnknownHostException, InterruptedException{
+    GossipSettings settings = new GossipSettings();
+    settings.setActiveGossipClass(DatacenterRackAwareActiveGossiper.class.getName());
+    List<GossipMember> startupMembers = new ArrayList<>();
+    Map<String, String> x = new HashMap<>();
+    x.put("a", "b");
+    x.put("datacenter", "dc1");
+    x.put("rack", "rack1");
+    GossipService gossipService1 = new GossipService("a", new URI("udp://" + "127.0.0.1" + ":" + (29000 + 0)), "0", x, startupMembers, settings,
+            (a, b) -> {}, new MetricRegistry());
+    gossipService1.start();
+    
+    Map<String, String> y = new HashMap<>();
+    y.put("a", "c");
+    y.put("datacenter", "dc2");
+    y.put("rack", "rack2");
+    GossipService gossipService2 = new GossipService("a", new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1)), "1", y,
+            Arrays.asList(new RemoteGossipMember("a",
+                    new URI("udp://" + "127.0.0.1" + ":" + (29000 + 0)), "0")),
+            settings, (a, b) -> { }, new MetricRegistry());
+    gossipService2.start();
+    TUnit.assertThat(() -> { 
+      String value = ""; 
+      try {
+        value = gossipService1.getGossipManager().getLiveMembers().get(0).getProperties().get("a");
+      } catch (RuntimeException e){ }
+      return value;
+    }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo("c");
+    
+    TUnit.assertThat(() -> { 
+      String value = ""; 
+      try {
+        value = gossipService2.getGossipManager().getLiveMembers().get(0).getProperties().get("a");
+      } catch (RuntimeException e){ }
+      return value;
+    }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo("b");
+    
+    gossipService1.shutdown();
+    gossipService2.shutdown();
+    
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3f1882fb/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
index f5e34ba..9d02556 100644
--- a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
+++ b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
@@ -25,6 +25,7 @@ import java.net.URISyntaxException;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Random;
 import java.util.UUID;
@@ -58,7 +59,7 @@ public class ShutdownDeadtimeTest {
     final int clusterMembers = 5;
     for (int i = 1; i < clusterMembers + 1; ++i) {
       URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30300 + i));
-      GossipService gossipService = new GossipService(cluster, uri, i + "", startupMembers,
+      GossipService gossipService = new GossipService(cluster, uri, i + "", new HashMap<String,String>(), startupMembers,
               settings, (a,b) -> {}, new MetricRegistry());
       clients.add(gossipService);
       gossipService.start();
@@ -100,11 +101,11 @@ public class ShutdownDeadtimeTest {
         }
         return total;
       }
-    }).afterWaitingAtMost(30, TimeUnit.SECONDS).isEqualTo(4);
+    }).afterWaitingAtMost(50, TimeUnit.SECONDS).isEqualTo(4);
 
     URI uri = new URI("udp://" + "127.0.0.1" + ":" + shutdownPort);
     // start client again
-    GossipService gossipService = new GossipService(cluster, uri, shutdownId + "", startupMembers,
+    GossipService gossipService = new GossipService(cluster, uri, shutdownId + "", new HashMap<String,String>(), startupMembers,
             settings, (a,b) -> {}, new MetricRegistry());
     clients.add(gossipService);
     gossipService.start();

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3f1882fb/src/test/java/org/apache/gossip/StartupSettingsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/gossip/StartupSettingsTest.java b/src/test/java/org/apache/gossip/StartupSettingsTest.java
index f0acabf..aa4d255 100644
--- a/src/test/java/org/apache/gossip/StartupSettingsTest.java
+++ b/src/test/java/org/apache/gossip/StartupSettingsTest.java
@@ -28,6 +28,7 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.UUID;
 import org.junit.platform.runner.JUnitPlatform;
 import org.junit.runner.RunWith;
@@ -48,7 +49,7 @@ public class StartupSettingsTest {
     writeSettingsFile(settingsFile);
     URI uri = new URI("udp://" + "127.0.0.1" + ":" + 50000);
     final GossipService firstService = new GossipService(
-            CLUSTER, uri, "1",
+            CLUSTER, uri, "1", new HashMap<String, String>(),
       new ArrayList<GossipMember>(), new GossipSettings(), null, new MetricRegistry());
     firstService.start();
     final GossipService serviceUnderTest = new GossipService(
@@ -70,6 +71,7 @@ public class StartupSettingsTest {
             "  \"cleanup_interval\":10000,\n" +
             "  \"convict_threshold\":2.6,\n" +
             "  \"distribution\":\"exponential\",\n" +
+            "  \"properties\":{},\n" +
             "  \"members\":[\n" +
             "    {\"cluster\": \"" + CLUSTER + "\",\"uri\":\"udp://127.0.0.1:5000\"}\n" +
             "  ]\n" +
@@ -77,7 +79,7 @@ public class StartupSettingsTest {
 
     log.info( "Using settings file with contents of:\n---\n" + settings + "\n---" );
     FileOutputStream output = new FileOutputStream(target);
-    output.write( settings.getBytes() );
+    output.write(settings.getBytes());
     output.close();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3f1882fb/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
index af7a117..bc4004d 100644
--- a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
+++ b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
@@ -24,6 +24,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.Callable;
@@ -58,7 +59,7 @@ public class TenNodeThreeSeedTest {
     final int clusterMembers = 5;
     for (int i = 1; i < clusterMembers+1; ++i) {
       URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i));
-      GossipService gossipService = new GossipService(cluster, uri, i + "",
+      GossipService gossipService = new GossipService(cluster, uri, i + "", new HashMap<String,String>(),
               startupMembers, settings, (a,b) -> {}, new MetricRegistry());
       clients.add(gossipService);
       gossipService.start();

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3f1882fb/src/test/java/org/apache/gossip/accrual/FailureDetectorTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/gossip/accrual/FailureDetectorTest.java b/src/test/java/org/apache/gossip/accrual/FailureDetectorTest.java
index 99cf9c8..69d46b8 100644
--- a/src/test/java/org/apache/gossip/accrual/FailureDetectorTest.java
+++ b/src/test/java/org/apache/gossip/accrual/FailureDetectorTest.java
@@ -34,7 +34,7 @@ public class FailureDetectorTest {
     int samples = 1;
     int windowSize = 1000;
     LocalGossipMember member = new LocalGossipMember("", URI.create("udp://127.0.0.1:1000"), 
-            "", 0L, windowSize, samples, "normal");
+            "", 0L, null, windowSize, samples, "normal");
     member.recordHeartbeat(5);
     member.recordHeartbeat(10);
     Assert.assertEquals(new Double(0.3010299956639812), member.detect(10));
@@ -45,7 +45,7 @@ public class FailureDetectorTest {
     int samples = 1;
     int windowSize = 1000;
     LocalGossipMember member = new LocalGossipMember("", URI.create("udp://127.0.0.1:1000"), 
-            "", 0L, windowSize, samples, "exponential");
+            "", 0L, null, windowSize, samples, "exponential");
     member.recordHeartbeat(5);
     member.recordHeartbeat(10);
     Assert.assertEquals(new Double(0.4342944819032518), member.detect(10));
@@ -65,7 +65,7 @@ public class FailureDetectorTest {
     int samples = 1;
     int windowSize = 1000;
     LocalGossipMember member = new LocalGossipMember("", URI.create("udp://127.0.0.1:1000"), 
-            "", 0L, windowSize, samples, "exponential");
+            "", 0L, null, windowSize, samples, "exponential");
     member.recordHeartbeat(5);
     member.recordHeartbeat(5);
     member.recordHeartbeat(5);

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3f1882fb/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java b/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java
index 1ef3a5b..cf38492 100644
--- a/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java
+++ b/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java
@@ -21,19 +21,15 @@ import com.codahale.metrics.MetricRegistry;
 import org.apache.gossip.GossipMember;
 import org.apache.gossip.GossipSettings;
 import org.apache.gossip.LocalGossipMember;
-import org.apache.gossip.event.GossipListener;
-import org.apache.gossip.event.GossipState;
 import org.apache.gossip.manager.random.RandomGossipManager;
 import org.junit.jupiter.api.Test;
 import org.junit.platform.runner.JUnitPlatform;
 import org.junit.runner.RunWith;
 
-import javax.management.Notification;
-import javax.management.NotificationListener;
-
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -43,18 +39,6 @@ import static org.junit.jupiter.api.Assertions.expectThrows;
 @RunWith(JUnitPlatform.class)
 public class RandomGossipManagerBuilderTest {
 
-  public static class TestGossipListener implements GossipListener {
-    @Override
-    public void gossipEvent(GossipMember member, GossipState state) {
-    }
-  }
-
-  public static class TestNotificationListener implements NotificationListener {
-    @Override
-    public void handleNotification(Notification notification, Object o) {
-    }
-  }
-
   @Test
   public void idShouldNotBeNull() {
     expectThrows(IllegalArgumentException.class,() -> {
@@ -91,7 +75,7 @@ public class RandomGossipManagerBuilderTest {
   public void useMemberListIfProvided() throws URISyntaxException {
     LocalGossipMember member = new LocalGossipMember(
             "aCluster", new URI("udp://localhost:2000"), "aGossipMember",
-            System.nanoTime(), 1000, 1, "exponential");
+            System.nanoTime(), new HashMap<String, String>(), 1000, 1, "exponential");
     List<GossipMember> memberList = new ArrayList<>();
     memberList.add(member);
     RandomGossipManager gossipManager = RandomGossipManager.newBuilder()


Mime
View raw message