gossip-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ecapri...@apache.org
Subject [1/2] incubator-gossip git commit: renamed packages from 'google' to 'apache' and updated necessary imports
Date Thu, 02 Jun 2016 15:13:18 GMT
Repository: incubator-gossip
Updated Branches:
  refs/heads/master 4d2ea58ba -> 3ca8e0f9c


http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3ca8e0f9/src/main/java/org/apache/gossip/GossipSettings.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/GossipSettings.java b/src/main/java/org/apache/gossip/GossipSettings.java
new file mode 100644
index 0000000..99b5807
--- /dev/null
+++ b/src/main/java/org/apache/gossip/GossipSettings.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip;
+
+/**
+ * In this object the settings used by the GossipService are held.
+ * 
+ * @author harmenw
+ */
+public class GossipSettings {
+
+  /** Time between gossip'ing in ms. Default is 1 second. */
+  private int gossipInterval = 1000;
+
+  /** Time between cleanups in ms. Default is 10 seconds. */
+  private int cleanupInterval = 10000;
+
+  /**
+   * Construct GossipSettings with default settings.
+   */
+  public GossipSettings() {
+  }
+
+  /**
+   * Construct GossipSettings with given settings.
+   * 
+   * @param gossipInterval
+   *          The gossip interval in ms.
+   * @param cleanupInterval
+   *          The cleanup interval in ms.
+   */
+  public GossipSettings(int gossipInterval, int cleanupInterval) {
+    this.gossipInterval = gossipInterval;
+    this.cleanupInterval = cleanupInterval;
+  }
+
+  /**
+   * Set the gossip interval. This is the time between a gossip message is send.
+   * 
+   * @param gossipInterval
+   *          The gossip interval in ms.
+   */
+  public void setGossipTimeout(int gossipInterval) {
+    this.gossipInterval = gossipInterval;
+  }
+
+  /**
+   * Set the cleanup interval. This is the time between the last heartbeat received from a member
+   * and when it will be marked as dead.
+   * 
+   * @param cleanupInterval
+   *          The cleanup interval in ms.
+   */
+  public void setCleanupInterval(int cleanupInterval) {
+    this.cleanupInterval = cleanupInterval;
+  }
+
+  /**
+   * Get the gossip interval.
+   * 
+   * @return The gossip interval in ms.
+   */
+  public int getGossipInterval() {
+    return gossipInterval;
+  }
+
+  /**
+   * Get the clean interval.
+   * 
+   * @return The cleanup interval.
+   */
+  public int getCleanupInterval() {
+    return cleanupInterval;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3ca8e0f9/src/main/java/org/apache/gossip/GossipTimeoutTimer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/GossipTimeoutTimer.java b/src/main/java/org/apache/gossip/GossipTimeoutTimer.java
new file mode 100644
index 0000000..2fa09c0
--- /dev/null
+++ b/src/main/java/org/apache/gossip/GossipTimeoutTimer.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip;
+
+import java.util.Date;
+
+import javax.management.NotificationListener;
+import javax.management.timer.Timer;
+
+/**
+ * This object represents a timer for a gossip member. When the timer has elapsed without being
+ * reset in the meantime, it will inform the GossipService about this who in turn will put the
+ * gossip member on the dead list, because it is apparantly not alive anymore.
+ * 
+ * @author joshclemm, harmenw
+ */
+public class GossipTimeoutTimer extends Timer {
+
+  private final long sleepTime;
+
+  private final LocalGossipMember source;
+
+  /**
+   * Constructor. Creates a reset-able timer that wakes up after millisecondsSleepTime.
+   * 
+   * @param millisecondsSleepTime
+   *          The time for this timer to wait before an event.
+   * @param notificationListener
+   * @param member
+   */
+  public GossipTimeoutTimer(long millisecondsSleepTime, NotificationListener notificationListener,
+          LocalGossipMember member) {
+    super();
+    sleepTime = millisecondsSleepTime;
+    source = member;
+    addNotificationListener(notificationListener, null, null);
+  }
+
+  /**
+   * @see javax.management.timer.Timer#start()
+   */
+  public void start() {
+    this.reset();
+    super.start();
+  }
+
+  /**
+   * Resets timer to start counting down from original time.
+   */
+  public void reset() {
+    removeAllNotifications();
+    setWakeupTime(sleepTime);
+  }
+
+  /**
+   * Adds a new wake-up time for this timer.
+   * 
+   * @param milliseconds
+   */
+  private void setWakeupTime(long milliseconds) {
+    addNotification("type", "message", source, new Date(System.currentTimeMillis() + milliseconds));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3ca8e0f9/src/main/java/org/apache/gossip/LocalGossipMember.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/LocalGossipMember.java b/src/main/java/org/apache/gossip/LocalGossipMember.java
new file mode 100644
index 0000000..55ce257
--- /dev/null
+++ b/src/main/java/org/apache/gossip/LocalGossipMember.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip;
+
+import javax.management.NotificationListener;
+
+/**
+ * This object represent a gossip member with the properties known locally. These objects are stored
+ * in the local list of gossip member.s
+ * 
+ * @author harmenw
+ */
+public class LocalGossipMember extends GossipMember {
+  /** The timeout timer for this gossip member. */
+  private final transient GossipTimeoutTimer timeoutTimer;
+
+  /**
+   * Constructor.
+   * 
+   * @param hostname
+   *          The hostname or IP address.
+   * @param port
+   *          The port number.
+   * @param id
+   * @param heartbeat
+   *          The current heartbeat.
+   * @param notificationListener
+   * @param cleanupTimeout
+   *          The cleanup timeout for this gossip member.
+   */
+  public LocalGossipMember(String clusterName, String hostname, int port, String id,
+          long heartbeat, NotificationListener notificationListener, int cleanupTimeout) {
+    super(clusterName, hostname, port, id, heartbeat);
+
+    timeoutTimer = new GossipTimeoutTimer(cleanupTimeout, notificationListener, this);
+  }
+
+  /**
+   * Start the timeout timer.
+   */
+  public void startTimeoutTimer() {
+    timeoutTimer.start();
+  }
+
+  /**
+   * Reset the timeout timer.
+   */
+  public void resetTimeoutTimer() {
+    timeoutTimer.reset();
+  }
+
+  public void disableTimer() {
+    timeoutTimer.removeAllNotifications();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3ca8e0f9/src/main/java/org/apache/gossip/RemoteGossipMember.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/RemoteGossipMember.java b/src/main/java/org/apache/gossip/RemoteGossipMember.java
new file mode 100644
index 0000000..899da93
--- /dev/null
+++ b/src/main/java/org/apache/gossip/RemoteGossipMember.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip;
+
+/**
+ * The object represents a gossip member with the properties as received from a remote gossip
+ * member.
+ * 
+ * @author harmenw
+ */
+public class RemoteGossipMember extends GossipMember {
+
+  /**
+   * Constructor.
+   * 
+   * @param hostname
+   *          The hostname or IP address.
+   * @param port
+   *          The port number.
+   * @param heartbeat
+   *          The current heartbeat.
+   */
+  public RemoteGossipMember(String clusterName, String hostname, int port, String id, long heartbeat) {
+    super(clusterName, hostname, port, id, heartbeat);
+  }
+
+  /**
+   * Construct a RemoteGossipMember with a heartbeat of 0.
+   * 
+   * @param hostname
+   *          The hostname or IP address.
+   * @param port
+   *          The port number.
+   */
+  public RemoteGossipMember(String clusterName, String hostname, int port, String id) {
+    super(clusterName, hostname, port, id, System.currentTimeMillis());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3ca8e0f9/src/main/java/org/apache/gossip/StartupSettings.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/StartupSettings.java b/src/main/java/org/apache/gossip/StartupSettings.java
new file mode 100644
index 0000000..176a79b
--- /dev/null
+++ b/src/main/java/org/apache/gossip/StartupSettings.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * This object represents the settings used when starting the gossip service.
+ * 
+ * @author harmenw
+ */
+public class StartupSettings {
+  private static final Logger log = Logger.getLogger(StartupSettings.class);
+
+  /** The id to use fo the service */
+  private String id;
+
+  /** The port to start the gossip service on. */
+  private int port;
+
+  private String cluster;
+
+  /** The gossip settings used at startup. */
+  private final GossipSettings gossipSettings;
+
+  /** The list with gossip members to start with. */
+  private final List<GossipMember> gossipMembers;
+
+  /**
+   * Constructor.
+   * 
+   * @param id
+   *          The id to be used for this service
+   * @param port
+   *          The port to start the service on.
+   * @param logLevel
+   *          unused
+   */
+  public StartupSettings(String id, int port, int logLevel, String cluster) {
+    this(id, port, new GossipSettings(), cluster);
+  }
+
+  /**
+   * Constructor.
+   * 
+   * @param id
+   *          The id to be used for this service
+   * @param port
+   *          The port to start the service on.
+   */
+  public StartupSettings(String id, int port, GossipSettings gossipSettings, String cluster) {
+    this.id = id;
+    this.port = port;
+    this.gossipSettings = gossipSettings;
+    this.setCluster(cluster);
+    gossipMembers = new ArrayList<>();
+  }
+
+  public void setCluster(String cluster) {
+    this.cluster = cluster;
+  }
+
+  public String getCluster() {
+    return cluster;
+  }
+
+  /**
+   * Set the id to be used for this service.
+   * 
+   * @param id
+   *          The id for this service.
+   */
+  public void setId(String id) {
+    this.id = id;
+  }
+
+  /**
+   * Get the id for this service.
+   * 
+   * @return the service's id.
+   */
+  public String getId() {
+    return id;
+  }
+
+  /**
+   * Set the port of the gossip service.
+   * 
+   * @param port
+   *          The port for the gossip service.
+   */
+  public void setPort(int port) {
+    this.port = port;
+  }
+
+  /**
+   * Get the port for the gossip service.
+   * 
+   * @return The port of the gossip service.
+   */
+  public int getPort() {
+    return port;
+  }
+
+  /**
+   * Get the GossipSettings.
+   * 
+   * @return The GossipSettings object.
+   */
+  public GossipSettings getGossipSettings() {
+    return gossipSettings;
+  }
+
+  /**
+   * Add a gossip member to the list of members to start with.
+   * 
+   * @param member
+   *          The member to add.
+   */
+  public void addGossipMember(GossipMember member) {
+    gossipMembers.add(member);
+  }
+
+  /**
+   * Get the list with gossip members.
+   * 
+   * @return The gossip members.
+   */
+  public List<GossipMember> getGossipMembers() {
+    return gossipMembers;
+  }
+
+  /**
+   * Parse the settings for the gossip service from a JSON file.
+   * 
+   * @param jsonFile
+   *          The file object which refers to the JSON config file.
+   * @return The StartupSettings object with the settings from the config file.
+   * @throws JSONException
+   *           Thrown when the file is not well-formed JSON.
+   * @throws FileNotFoundException
+   *           Thrown when the file cannot be found.
+   * @throws IOException
+   *           Thrown when reading the file gives problems.
+   */
+  public static StartupSettings fromJSONFile(File jsonFile) throws JSONException,
+          FileNotFoundException, IOException {
+    // Read the file to a String.
+    StringBuffer buffer = new StringBuffer();
+    try (BufferedReader br = new BufferedReader(new FileReader(jsonFile)) ){
+      String line;
+      while ((line = br.readLine()) != null) {
+        buffer.append(line.trim());
+      }
+    }
+    
+    JSONObject jsonObject = new JSONArray(buffer.toString()).getJSONObject(0);
+    int port = jsonObject.getInt("port");
+    String id = jsonObject.getString("id");
+    int gossipInterval = jsonObject.getInt("gossip_interval");
+    int cleanupInterval = jsonObject.getInt("cleanup_interval");
+    String cluster = jsonObject.getString("cluster");
+    if (cluster == null){
+      throw new IllegalArgumentException("cluster was null. It is required");
+    }
+    StartupSettings settings = new StartupSettings(id, port, new GossipSettings(gossipInterval,
+            cleanupInterval), cluster);
+
+    // Now iterate over the members from the config file and add them to the settings.
+    String configMembersDetails = "Config-members [";
+    JSONArray membersJSON = jsonObject.getJSONArray("members");
+    for (int i = 0; i < membersJSON.length(); i++) {
+      JSONObject memberJSON = membersJSON.getJSONObject(i);
+      RemoteGossipMember member = new RemoteGossipMember(memberJSON.getString("cluster"),
+              memberJSON.getString("host"), memberJSON.getInt("port"), "");
+      settings.addGossipMember(member);
+      configMembersDetails += member.getAddress();
+      if (i < (membersJSON.length() - 1))
+        configMembersDetails += ", ";
+    }
+    log.info(configMembersDetails + "]");
+
+    // Return the created settings object.
+    return settings;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3ca8e0f9/src/main/java/org/apache/gossip/event/GossipListener.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/event/GossipListener.java b/src/main/java/org/apache/gossip/event/GossipListener.java
new file mode 100644
index 0000000..2e882f6
--- /dev/null
+++ b/src/main/java/org/apache/gossip/event/GossipListener.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.event;
+
+import org.apache.gossip.GossipMember;
+
+public interface GossipListener {
+  void gossipEvent(GossipMember member, GossipState state);
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3ca8e0f9/src/main/java/org/apache/gossip/event/GossipState.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/event/GossipState.java b/src/main/java/org/apache/gossip/event/GossipState.java
new file mode 100644
index 0000000..3b76c9e
--- /dev/null
+++ b/src/main/java/org/apache/gossip/event/GossipState.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.event;
+
+public enum GossipState {
+  UP("up"), DOWN("down");
+  @SuppressWarnings("unused")
+  private final String state;
+
+  private GossipState(String state) {
+    this.state = state;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3ca8e0f9/src/main/java/org/apache/gossip/examples/GossipExample.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/examples/GossipExample.java b/src/main/java/org/apache/gossip/examples/GossipExample.java
new file mode 100644
index 0000000..e953c77
--- /dev/null
+++ b/src/main/java/org/apache/gossip/examples/GossipExample.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.examples;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.gossip.GossipMember;
+import org.apache.gossip.GossipService;
+import org.apache.gossip.GossipSettings;
+import org.apache.gossip.RemoteGossipMember;
+
+/**
+ * This class is an example of how one could use the gossip service. Here we start multiple gossip
+ * clients on this host as specified in the config file.
+ * 
+ * @author harmenw
+ */
+public class GossipExample extends Thread {
+  /** The number of clients to start. */
+  private static final int NUMBER_OF_CLIENTS = 4;
+
+  /**
+   * @param args
+   */
+  public static void main(String[] args) {
+    new GossipExample();
+  }
+
+  /**
+   * Constructor. This will start the this thread.
+   */
+  public GossipExample() {
+    start();
+  }
+
+  /**
+   * @see java.lang.Thread#run()
+   */
+  public void run() {
+    try {
+      GossipSettings settings = new GossipSettings();
+
+      List<GossipService> clients = new ArrayList<>();
+
+      // Get my ip address.
+      String myIpAddress = InetAddress.getLocalHost().getHostAddress();
+
+      String cluster = "My Gossip Cluster";
+
+      // Create the gossip members and put them in a list and give them a port number starting with
+      // 2000.
+      List<GossipMember> startupMembers = new ArrayList<>();
+      for (int i = 0; i < NUMBER_OF_CLIENTS; ++i) {
+        startupMembers.add(new RemoteGossipMember(cluster, myIpAddress, 2000 + i, ""));
+      }
+
+      // Lets start the gossip clients.
+      // Start the clients, waiting cleaning-interval + 1 second between them which will show the
+      // dead list handling.
+      for (GossipMember member : startupMembers) {
+        GossipService gossipService = new GossipService(cluster, myIpAddress, member.getPort(), "",
+                startupMembers, settings, null);
+        clients.add(gossipService);
+        gossipService.start();
+        sleep(settings.getCleanupInterval() + 1000);
+      }
+
+      // After starting all gossip clients, first wait 10 seconds and then shut them down.
+      sleep(10000);
+      System.err.println("Going to shutdown all services...");
+      // Since they all run in the same virtual machine and share the same executor, if one is
+      // shutdown they will all stop.
+      clients.get(0).shutdown();
+
+    } catch (UnknownHostException e) {
+      e.printStackTrace();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3ca8e0f9/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
new file mode 100644
index 0000000..b966fcb
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.gossip.GossipService;
+import org.apache.gossip.LocalGossipMember;
+
+/**
+ * [The active thread: periodically send gossip request.] The class handles gossiping the membership
+ * list. This information is important to maintaining a common state among all the nodes, and is
+ * important for detecting failures.
+ */
+abstract public class ActiveGossipThread implements Runnable {
+
+  protected final GossipManager gossipManager;
+
+  private final AtomicBoolean keepRunning;
+
+  public ActiveGossipThread(GossipManager gossipManager) {
+    this.gossipManager = gossipManager;
+    this.keepRunning = new AtomicBoolean(true);
+  }
+
+  @Override
+  public void run() {
+    while (keepRunning.get()) {
+      try {
+        TimeUnit.MILLISECONDS.sleep(gossipManager.getSettings().getGossipInterval());
+        sendMembershipList(gossipManager.getMyself(), gossipManager.getMemberList());
+      } catch (InterruptedException e) {
+        GossipService.LOGGER.error(e);
+        keepRunning.set(false);
+      }
+    }
+    shutdown();
+  }
+
+  public void shutdown() {
+    keepRunning.set(false);
+  }
+
+  /**
+   * Performs the sending of the membership list, after we have incremented our own heartbeat.
+   */
+  abstract protected void sendMembershipList(LocalGossipMember me,
+          List<LocalGossipMember> memberList);
+
+  /**
+   * Abstract method which should be implemented by a subclass. This method should return a member
+   * of the list to gossip with.
+   * 
+   * @param memberList
+   *          The list of members which are stored in the local list of members.
+   * @return The chosen LocalGossipMember to gossip with.
+   */
+  abstract protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList);
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3ca8e0f9/src/main/java/org/apache/gossip/manager/GossipManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java
new file mode 100644
index 0000000..80cadf7
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.management.Notification;
+import javax.management.NotificationListener;
+
+import org.apache.log4j.Logger;
+
+import org.apache.gossip.GossipMember;
+import org.apache.gossip.GossipService;
+import org.apache.gossip.GossipSettings;
+import org.apache.gossip.LocalGossipMember;
+import org.apache.gossip.event.GossipListener;
+import org.apache.gossip.event.GossipState;
+
+public abstract class GossipManager extends Thread implements NotificationListener {
+
+  public static final Logger LOGGER = Logger.getLogger(GossipManager.class);
+
+  public static final int MAX_PACKET_SIZE = 102400;
+
+  private final ConcurrentSkipListMap<LocalGossipMember, GossipState> members;
+
+  private final LocalGossipMember me;
+
+  private final GossipSettings settings;
+
+  private final AtomicBoolean gossipServiceRunning;
+
+  private final Class<? extends PassiveGossipThread> passiveGossipThreadClass;
+
+  private final Class<? extends ActiveGossipThread> activeGossipThreadClass;
+
+  private final GossipListener listener;
+
+  private ActiveGossipThread activeGossipThread;
+
+  private PassiveGossipThread passiveGossipThread;
+
+  private ExecutorService gossipThreadExecutor;
+
+  public GossipManager(Class<? extends PassiveGossipThread> passiveGossipThreadClass,
+          Class<? extends ActiveGossipThread> activeGossipThreadClass, String cluster,
+          String address, int port, String id, GossipSettings settings,
+          List<GossipMember> gossipMembers, GossipListener listener) {
+    this.passiveGossipThreadClass = passiveGossipThreadClass;
+    this.activeGossipThreadClass = activeGossipThreadClass;
+    this.settings = settings;
+    me = new LocalGossipMember(cluster, address, port, id, System.currentTimeMillis(), this,
+            settings.getCleanupInterval());
+    members = new ConcurrentSkipListMap<>();
+    for (GossipMember startupMember : gossipMembers) {
+      if (!startupMember.equals(me)) {
+        LocalGossipMember member = new LocalGossipMember(startupMember.getClusterName(),
+                startupMember.getHost(), startupMember.getPort(), startupMember.getId(),
+                System.currentTimeMillis(), this, settings.getCleanupInterval());
+        members.put(member, GossipState.UP);
+        GossipService.LOGGER.debug(member);
+      }
+    }
+    gossipThreadExecutor = Executors.newCachedThreadPool();
+    gossipServiceRunning = new AtomicBoolean(true);
+    this.listener = listener;
+    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+      public void run() {
+        GossipService.LOGGER.debug("Service has been shutdown...");
+      }
+    }));
+  }
+
+  /**
+   * All timers associated with a member will trigger this method when it goes off. The timer will
+   * go off if we have not heard from this member in <code> _settings.T_CLEANUP </code> time.
+   */
+  @Override
+  public void handleNotification(Notification notification, Object handback) {
+    LocalGossipMember deadMember = (LocalGossipMember) notification.getUserData();
+    GossipService.LOGGER.debug("Dead member detected: " + deadMember);
+    members.put(deadMember, GossipState.DOWN);
+    if (listener != null) {
+      listener.gossipEvent(deadMember, GossipState.DOWN);
+    }
+  }
+
+  public void revivieMember(LocalGossipMember m) {
+    for (Entry<LocalGossipMember, GossipState> it : this.members.entrySet()) {
+      if (it.getKey().getId().equals(m.getId())) {
+        it.getKey().disableTimer();
+      }
+    }
+    members.remove(m);
+    members.put(m, GossipState.UP);
+    if (listener != null) {
+      listener.gossipEvent(m, GossipState.UP);
+    }
+  }
+
+  public void createOrRevivieMember(LocalGossipMember m) {
+    members.put(m, GossipState.UP);
+    if (listener != null) {
+      listener.gossipEvent(m, GossipState.UP);
+    }
+  }
+
+  public GossipSettings getSettings() {
+    return settings;
+  }
+
+  /**
+   * 
+   * @return a read only list of members found in the UP state
+   */
+  public List<LocalGossipMember> getMemberList() {
+    List<LocalGossipMember> up = new ArrayList<>();
+    for (Entry<LocalGossipMember, GossipState> entry : members.entrySet()) {
+      if (GossipState.UP.equals(entry.getValue())) {
+        up.add(entry.getKey());
+      }
+    }
+    return Collections.unmodifiableList(up);
+  }
+
+  public LocalGossipMember getMyself() {
+    return me;
+  }
+
+  public List<LocalGossipMember> getDeadList() {
+    List<LocalGossipMember> up = new ArrayList<>();
+    for (Entry<LocalGossipMember, GossipState> entry : members.entrySet()) {
+      if (GossipState.DOWN.equals(entry.getValue())) {
+        up.add(entry.getKey());
+      }
+    }
+    return Collections.unmodifiableList(up);
+  }
+
+  /**
+   * Starts the client. Specifically, start the various cycles for this protocol. Start the gossip
+   * thread and start the receiver thread.
+   */
+  public void run() {
+    for (LocalGossipMember member : members.keySet()) {
+      if (member != me) {
+        member.startTimeoutTimer();
+      }
+    }
+    try {
+      passiveGossipThread = passiveGossipThreadClass.getConstructor(GossipManager.class)
+              .newInstance(this);
+      gossipThreadExecutor.execute(passiveGossipThread);
+      activeGossipThread = activeGossipThreadClass.getConstructor(GossipManager.class)
+              .newInstance(this);
+      gossipThreadExecutor.execute(activeGossipThread);
+    } catch (InstantiationException | IllegalAccessException | IllegalArgumentException
+            | InvocationTargetException | NoSuchMethodException | SecurityException e1) {
+      throw new RuntimeException(e1);
+    }
+    GossipService.LOGGER.debug("The GossipService is started.");
+    while (gossipServiceRunning.get()) {
+      try {
+        // TODO
+        TimeUnit.MILLISECONDS.sleep(1);
+      } catch (InterruptedException e) {
+        GossipService.LOGGER.warn("The GossipClient was interrupted.");
+      }
+    }
+  }
+
+  /**
+   * Shutdown the gossip service.
+   */
+  public void shutdown() {
+    gossipServiceRunning.set(false);
+    gossipThreadExecutor.shutdown();
+    if (passiveGossipThread != null) {
+      passiveGossipThread.shutdown();
+    }
+    if (activeGossipThread != null) {
+      activeGossipThread.shutdown();
+    }
+    try {
+      boolean result = gossipThreadExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS);
+      if (!result) {
+        LOGGER.error("executor shutdown timed out");
+      }
+    } catch (InterruptedException e) {
+      LOGGER.error(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3ca8e0f9/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
new file mode 100644
index 0000000..bd7354e
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.gossip.GossipMember;
+import org.apache.gossip.GossipService;
+import org.apache.gossip.model.ActiveGossipMessage;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.apache.gossip.RemoteGossipMember;
+
+/**
+ * [The passive thread: reply to incoming gossip request.] This class handles the passive cycle,
+ * where this client has received an incoming message. For now, this message is always the
+ * membership list, but if you choose to gossip additional information, you will need some logic to
+ * determine the incoming message.
+ */
+abstract public class PassiveGossipThread implements Runnable {
+
+  public static final Logger LOGGER = Logger.getLogger(PassiveGossipThread.class);
+
+  /** The socket used for the passive thread of the gossip service. */
+  private final DatagramSocket server;
+
+  private final GossipManager gossipManager;
+
+  private final AtomicBoolean keepRunning;
+
+  private final String cluster;
+  
+  private final ObjectMapper MAPPER = new ObjectMapper();
+
+  public PassiveGossipThread(GossipManager gossipManager) {
+    this.gossipManager = gossipManager;
+    try {
+      SocketAddress socketAddress = new InetSocketAddress(gossipManager.getMyself().getHost(),
+              gossipManager.getMyself().getPort());
+      server = new DatagramSocket(socketAddress);
+      GossipService.LOGGER.debug("Gossip service successfully initialized on port "
+              + gossipManager.getMyself().getPort());
+      GossipService.LOGGER.debug("I am " + gossipManager.getMyself());
+      cluster = gossipManager.getMyself().getClusterName();
+      if (cluster == null){
+        throw new IllegalArgumentException("cluster was null");
+      }
+    } catch (SocketException ex) {
+      GossipService.LOGGER.warn(ex);
+      throw new RuntimeException(ex);
+    }
+    keepRunning = new AtomicBoolean(true);
+  }
+
+  @Override
+  public void run() {
+    while (keepRunning.get()) {
+      try {
+        byte[] buf = new byte[server.getReceiveBufferSize()];
+        DatagramPacket p = new DatagramPacket(buf, buf.length);
+        server.receive(p);
+        int packet_length = 0;
+        for (int i = 0; i < 4; i++) {
+          int shift = (4 - 1 - i) * 8;
+          packet_length += (buf[i] & 0x000000FF) << shift;
+        }
+        if (packet_length <= GossipManager.MAX_PACKET_SIZE) {
+          byte[] json_bytes = new byte[packet_length];
+          for (int i = 0; i < packet_length; i++) {
+            json_bytes[i] = buf[i + 4];
+          }
+          if (GossipService.LOGGER.isDebugEnabled()){
+            String receivedMessage = new String(json_bytes);
+            GossipService.LOGGER.debug("Received message (" + packet_length + " bytes): "
+                  + receivedMessage);
+          }
+          try {
+            List<GossipMember> remoteGossipMembers = new ArrayList<>();
+            RemoteGossipMember senderMember = null;
+            ActiveGossipMessage activeGossipMessage = MAPPER.readValue(json_bytes,
+                    ActiveGossipMessage.class);
+            for (int i = 0; i < activeGossipMessage.getMembers().size(); i++) {
+              RemoteGossipMember member = new RemoteGossipMember(
+                      activeGossipMessage.getMembers().get(i).getCluster(),
+                      activeGossipMessage.getMembers().get(i).getHost(),
+                      activeGossipMessage.getMembers().get(i).getPort(),
+                      activeGossipMessage.getMembers().get(i).getId(),
+                      activeGossipMessage.getMembers().get(i).getHeartbeat());
+              if (!(member.getClusterName().equals(cluster))){
+                GossipService.LOGGER.warn("Note a member of this cluster " + i);
+                continue;
+              }
+              // This is the first member found, so this should be the member who is communicating
+              // with me.
+              if (i == 0) {
+                senderMember = member;
+              } 
+              remoteGossipMembers.add(member);
+            }
+            mergeLists(gossipManager, senderMember, remoteGossipMembers);
+          } catch (RuntimeException ex) {
+            GossipService.LOGGER.error("Unable to process message", ex);
+          }
+        } else {
+          GossipService.LOGGER
+                  .error("The received message is not of the expected size, it has been dropped.");
+        }
+
+      } catch (IOException e) {
+        GossipService.LOGGER.error(e);
+        System.out.println(e);
+        keepRunning.set(false);
+      }
+    }
+    shutdown();
+  }
+
+  public void shutdown() {
+    try {
+      server.close();
+    } catch (RuntimeException ex) {
+    }
+  }
+
+  /**
+   * Abstract method for merging the local and remote list.
+   * 
+   * @param gossipManager
+   *          The GossipManager for retrieving the local members and dead members list.
+   * @param senderMember
+   *          The member who is sending this list, this could be used to send a response if the
+   *          remote list contains out-dated information.
+   * @param remoteList
+   *          The list of members known at the remote side.
+   */
+  abstract protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember,
+          List<GossipMember> remoteList);
+}
+
+/*
+ * random comments // Check whether the package is smaller than the maximal packet length. // A
+ * package larger than this would not be possible to be send from a GossipService, // since this is
+ * check before sending the message. // This could normally only occur when the list of members is
+ * very big, // or when the packet is malformed, and the first 4 bytes is not the right in anymore.
+ * // For this reason we regards the message.
+ */

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3ca8e0f9/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java b/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
new file mode 100644
index 0000000..edf21f3
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager.impl;
+
+import java.util.List;
+
+import org.apache.gossip.GossipMember;
+import org.apache.gossip.GossipService;
+import org.apache.gossip.LocalGossipMember;
+import org.apache.gossip.RemoteGossipMember;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.manager.PassiveGossipThread;
+
+public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread {
+
+  public OnlyProcessReceivedPassiveGossipThread(GossipManager gossipManager) {
+    super(gossipManager);
+  }
+
+  /**
+   * Merge remote list (received from peer), and our local member list. Simply, we must update the
+   * heartbeats that the remote list has with our list. Also, some additional logic is needed to
+   * make sure we have not timed out a member and then immediately received a list with that member.
+   * 
+   * @param gossipManager
+   * @param senderMember
+   * @param remoteList
+   */
+  protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember,
+          List<GossipMember> remoteList) {
+
+    // if the person sending to us is in the dead list consider them up
+    for (LocalGossipMember i : gossipManager.getDeadList()) {
+      if (i.getId().equals(senderMember.getId())) {
+        System.out.println(gossipManager.getMyself() + " caught a live one!");
+        LocalGossipMember newLocalMember = new LocalGossipMember(senderMember.getClusterName(),
+                senderMember.getHost(), senderMember.getPort(), senderMember.getId(),
+                senderMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
+                        .getCleanupInterval());
+        gossipManager.revivieMember(newLocalMember);
+        newLocalMember.startTimeoutTimer();
+      }
+    }
+    for (GossipMember remoteMember : remoteList) {
+      if (remoteMember.getId().equals(gossipManager.getMyself().getId())) {
+        continue;
+      }
+      if (gossipManager.getMemberList().contains(remoteMember)) {
+        LocalGossipMember localMember = gossipManager.getMemberList().get(
+                gossipManager.getMemberList().indexOf(remoteMember));
+        if (remoteMember.getHeartbeat() > localMember.getHeartbeat()) {
+          localMember.setHeartbeat(remoteMember.getHeartbeat());
+          localMember.resetTimeoutTimer();
+        }
+      } else if (!gossipManager.getMemberList().contains(remoteMember)
+              && !gossipManager.getDeadList().contains(remoteMember)) {
+        LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(),
+                remoteMember.getHost(), remoteMember.getPort(), remoteMember.getId(),
+                remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
+                        .getCleanupInterval());
+        gossipManager.createOrRevivieMember(newLocalMember);
+        newLocalMember.startTimeoutTimer();
+      } else {
+        if (gossipManager.getDeadList().contains(remoteMember)) {
+          LocalGossipMember localDeadMember = gossipManager.getDeadList().get(
+                  gossipManager.getDeadList().indexOf(remoteMember));
+          if (remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) {
+            LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(),
+                    remoteMember.getHost(), remoteMember.getPort(), remoteMember.getId(),
+                    remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
+                            .getCleanupInterval());
+            gossipManager.revivieMember(newLocalMember);
+            newLocalMember.startTimeoutTimer();
+            GossipService.LOGGER.debug("Removed remote member " + remoteMember.getAddress()
+                    + " from dead list and added to local member list.");
+          } else {
+            GossipService.LOGGER.debug("me " + gossipManager.getMyself());
+            GossipService.LOGGER.debug("sender " + senderMember);
+            GossipService.LOGGER.debug("remote " + remoteList);
+            GossipService.LOGGER.debug("live " + gossipManager.getMemberList());
+            GossipService.LOGGER.debug("dead " + gossipManager.getDeadList());
+          }
+        } else {
+          GossipService.LOGGER.debug("me " + gossipManager.getMyself());
+          GossipService.LOGGER.debug("sender " + senderMember);
+          GossipService.LOGGER.debug("remote " + remoteList);
+          GossipService.LOGGER.debug("live " + gossipManager.getMemberList());
+          GossipService.LOGGER.debug("dead " + gossipManager.getDeadList());
+          // throw new IllegalArgumentException("wtf");
+        }
+      }
+    }
+  }
+
+}
+
+/**
+ * old comment section: // If a member is restarted the heartbeat will restart from 1, so we should
+ * check // that here. // So a member can become from the dead when it is either larger than a
+ * previous // heartbeat (due to network failure) // or when the heartbeat is 1 (after a restart of
+ * the service). // TODO: What if the first message of a gossip service is sent to a dead node? The
+ * // second member will receive a heartbeat of two. // TODO: The above does happen. Maybe a special
+ * message for a revived member? // TODO: Or maybe when a member is declared dead for more than //
+ * _settings.getCleanupInterval() ms, reset the heartbeat to 0. // It will then accept a revived
+ * member. // The above is now handle by checking whether the heartbeat differs //
+ * _settings.getCleanupInterval(), it must be restarted.
+ */
+
+/*
+ * // The remote member is back from the dead. // Remove it from the dead list. //
+ * gossipManager.getDeadList().remove(localDeadMember); // Add it as a new member and add it to the
+ * member list.
+ */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3ca8e0f9/src/main/java/org/apache/gossip/manager/impl/SendMembersActiveGossipThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/impl/SendMembersActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/impl/SendMembersActiveGossipThread.java
new file mode 100644
index 0000000..16d0d32
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/impl/SendMembersActiveGossipThread.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager.impl;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.gossip.GossipService;
+import org.apache.gossip.LocalGossipMember;
+import org.apache.gossip.manager.ActiveGossipThread;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.model.ActiveGossipMessage;
+import org.apache.gossip.model.GossipMember;
+import org.codehaus.jackson.map.ObjectMapper;
+
+abstract public class SendMembersActiveGossipThread extends ActiveGossipThread {
+
+  protected ObjectMapper om = new ObjectMapper();
+  
+  public SendMembersActiveGossipThread(GossipManager gossipManager) {
+    super(gossipManager);
+  }
+
+  private GossipMember convert(LocalGossipMember member){
+    GossipMember gm = new GossipMember();
+    gm.setCluster(member.getClusterName());
+    gm.setHeartbeat(member.getHeartbeat());
+    gm.setHost(member.getHost());
+    gm.setId(member.getId());
+    gm.setPort(member.getPort());
+    return gm;
+  }
+  
+  /**
+   * Performs the sending of the membership list, after we have incremented our own heartbeat.
+   */
+  protected void sendMembershipList(LocalGossipMember me, List<LocalGossipMember> memberList) {
+    GossipService.LOGGER.debug("Send sendMembershipList() is called.");
+    me.setHeartbeat(System.currentTimeMillis());
+    LocalGossipMember member = selectPartner(memberList);
+    if (member == null) {
+      return;
+    }
+    try (DatagramSocket socket = new DatagramSocket()) {
+      socket.setSoTimeout(gossipManager.getSettings().getGossipInterval());
+      InetAddress dest = InetAddress.getByName(member.getHost());
+      ActiveGossipMessage message = new ActiveGossipMessage();
+      message.getMembers().add(convert(me));
+      for (LocalGossipMember other : memberList) {
+        message.getMembers().add(convert(other));
+      }
+      byte[] json_bytes = om.writeValueAsString(message).getBytes();
+      int packet_length = json_bytes.length;
+      if (packet_length < GossipManager.MAX_PACKET_SIZE) {
+        byte[] buf = createBuffer(packet_length, json_bytes);
+        DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, member.getPort());
+        socket.send(datagramPacket);
+      } else {
+        GossipService.LOGGER.error("The length of the to be send message is too large ("
+                + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
+      }
+    } catch (IOException e1) {
+      GossipService.LOGGER.warn(e1);
+    }
+  }
+
+  private byte[] createBuffer(int packetLength, byte[] jsonBytes) {
+    byte[] lengthBytes = new byte[4];
+    lengthBytes[0] = (byte) (packetLength >> 24);
+    lengthBytes[1] = (byte) ((packetLength << 8) >> 24);
+    lengthBytes[2] = (byte) ((packetLength << 16) >> 24);
+    lengthBytes[3] = (byte) ((packetLength << 24) >> 24);
+    ByteBuffer byteBuffer = ByteBuffer.allocate(4 + jsonBytes.length);
+    byteBuffer.put(lengthBytes);
+    byteBuffer.put(jsonBytes);
+    byte[] buf = byteBuffer.array();
+    return buf;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3ca8e0f9/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java
new file mode 100644
index 0000000..23a41f5
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager.random;
+
+import java.util.List;
+import java.util.Random;
+
+import org.apache.gossip.GossipService;
+import org.apache.gossip.LocalGossipMember;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.manager.impl.SendMembersActiveGossipThread;
+
+public class RandomActiveGossipThread extends SendMembersActiveGossipThread {
+
+  /** The Random used for choosing a member to gossip with. */
+  private final Random random;
+
+  public RandomActiveGossipThread(GossipManager gossipManager) {
+    super(gossipManager);
+    random = new Random();
+  }
+
+  /**
+   * [The selectToSend() function.] Find a random peer from the local membership list. In the case
+   * where this client is the only member in the list, this method will return null.
+   * 
+   * @return Member random member if list is greater than 1, null otherwise
+   */
+  protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) {
+    LocalGossipMember member = null;
+    if (memberList.size() > 0) {
+      int randomNeighborIndex = random.nextInt(memberList.size());
+      member = memberList.get(randomNeighborIndex);
+    } else {
+      GossipService.LOGGER.debug("I am alone in this world.");
+    }
+    return member;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3ca8e0f9/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
new file mode 100644
index 0000000..0122610
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager.random;
+
+import org.apache.gossip.GossipMember;
+import org.apache.gossip.GossipSettings;
+import org.apache.gossip.event.GossipListener;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
+
+import java.util.List;
+
+public class RandomGossipManager extends GossipManager {
+  public RandomGossipManager(String cluster, String address, int port, String id,
+                             GossipSettings settings, List<GossipMember> gossipMembers, GossipListener listener) {
+    super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, cluster,
+            address, port, id, settings, gossipMembers, listener);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3ca8e0f9/src/main/java/org/apache/gossip/model/ActiveGossipMessage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/model/ActiveGossipMessage.java b/src/main/java/org/apache/gossip/model/ActiveGossipMessage.java
new file mode 100644
index 0000000..ac940d8
--- /dev/null
+++ b/src/main/java/org/apache/gossip/model/ActiveGossipMessage.java
@@ -0,0 +1,22 @@
+package org.apache.gossip.model;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ActiveGossipMessage {
+
+  private List<GossipMember> members = new ArrayList<>();
+  
+  public ActiveGossipMessage(){
+    
+  }
+
+  public List<GossipMember> getMembers() {
+    return members;
+  }
+
+  public void setMembers(List<GossipMember> members) {
+    this.members = members;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3ca8e0f9/src/main/java/org/apache/gossip/model/GossipMember.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/model/GossipMember.java b/src/main/java/org/apache/gossip/model/GossipMember.java
new file mode 100644
index 0000000..8dc6bf7
--- /dev/null
+++ b/src/main/java/org/apache/gossip/model/GossipMember.java
@@ -0,0 +1,63 @@
+package org.apache.gossip.model;
+
+public class GossipMember {
+
+  private String cluster;
+  private String host;
+  private Integer port;
+  private String id;
+  private Long heartbeat;
+  
+  public GossipMember(){
+    
+  }
+  
+  public GossipMember(String cluster, String host, Integer port, String id, Long heartbeat){
+    this.cluster=cluster;
+    this.host= host;
+    this.port = port;
+    this.id = id;
+    
+  }
+
+  public String getCluster() {
+    return cluster;
+  }
+
+  public void setCluster(String cluster) {
+    this.cluster = cluster;
+  }
+
+  public String getHost() {
+    return host;
+  }
+
+  public void setHost(String host) {
+    this.host = host;
+  }
+
+  public Integer getPort() {
+    return port;
+  }
+
+  public void setPort(Integer port) {
+    this.port = port;
+  }
+
+  public String getId() {
+    return id;
+  }
+
+  public void setId(String id) {
+    this.id = id;
+  }
+
+  public Long getHeartbeat() {
+    return heartbeat;
+  }
+
+  public void setHeartbeat(Long heartbeat) {
+    this.heartbeat = heartbeat;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3ca8e0f9/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java b/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java
index af30eb7..2d8190b 100644
--- a/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java
+++ b/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java
@@ -31,12 +31,12 @@ import java.util.concurrent.TimeUnit;
 import org.apache.log4j.Logger;
 import org.junit.Test;
 
-import com.google.code.gossip.GossipMember;
-import com.google.code.gossip.GossipService;
-import com.google.code.gossip.GossipSettings;
-import com.google.code.gossip.RemoteGossipMember;
-import com.google.code.gossip.event.GossipListener;
-import com.google.code.gossip.event.GossipState;
+import org.apache.gossip.GossipMember;
+import org.apache.gossip.GossipService;
+import org.apache.gossip.GossipSettings;
+import org.apache.gossip.RemoteGossipMember;
+import org.apache.gossip.event.GossipListener;
+import org.apache.gossip.event.GossipState;
 
 public class ShutdownDeadtimeTest {
 

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3ca8e0f9/src/test/java/io/teknek/gossip/StartupSettingsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/io/teknek/gossip/StartupSettingsTest.java b/src/test/java/io/teknek/gossip/StartupSettingsTest.java
index bf6710e..aa4e404 100644
--- a/src/test/java/io/teknek/gossip/StartupSettingsTest.java
+++ b/src/test/java/io/teknek/gossip/StartupSettingsTest.java
@@ -17,10 +17,10 @@
  */
 package io.teknek.gossip;
 
-import com.google.code.gossip.GossipMember;
-import com.google.code.gossip.GossipService;
-import com.google.code.gossip.GossipSettings;
-import com.google.code.gossip.StartupSettings;
+import org.apache.gossip.GossipMember;
+import org.apache.gossip.GossipService;
+import org.apache.gossip.GossipSettings;
+import org.apache.gossip.StartupSettings;
 import org.apache.log4j.Logger;
 import org.json.JSONException;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3ca8e0f9/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java b/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java
index 277d0fe..4e731ae 100644
--- a/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java
+++ b/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java
@@ -30,12 +30,12 @@ import java.util.concurrent.TimeUnit;
 import org.apache.log4j.Logger;
 import org.junit.Test;
 
-import com.google.code.gossip.GossipMember;
-import com.google.code.gossip.GossipService;
-import com.google.code.gossip.GossipSettings;
-import com.google.code.gossip.RemoteGossipMember;
-import com.google.code.gossip.event.GossipListener;
-import com.google.code.gossip.event.GossipState;
+import org.apache.gossip.GossipMember;
+import org.apache.gossip.GossipService;
+import org.apache.gossip.GossipSettings;
+import org.apache.gossip.RemoteGossipMember;
+import org.apache.gossip.event.GossipListener;
+import org.apache.gossip.event.GossipState;
 
 public class TenNodeThreeSeedTest {
   private static final Logger log = Logger.getLogger( TenNodeThreeSeedTest.class );


Mime
View raw message