gossip-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ecapri...@apache.org
Subject incubator-gossip git commit: GOSSIP-44 Remove 4 byte header
Date Sat, 21 Jan 2017 03:46:44 GMT
Repository: incubator-gossip
Updated Branches:
  refs/heads/master 428c0573f -> b2af44907


GOSSIP-44 Remove 4 byte header


Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/b2af4490
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/b2af4490
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/b2af4490

Branch: refs/heads/master
Commit: b2af44907436bd694de2a08254f94acd971f30f2
Parents: 428c057
Author: Edward Capriolo <edlinuxguru@gmail.com>
Authored: Fri Jan 20 11:40:52 2017 -0500
Committer: Edward Capriolo <edlinuxguru@gmail.com>
Committed: Fri Jan 20 22:45:57 2017 -0500

----------------------------------------------------------------------
 .../apache/gossip/examples/StandAloneNode.java  |   2 +-
 .../gossip/manager/ActiveGossipThread.java      | 108 +++++++------------
 .../org/apache/gossip/manager/GossipCore.java   |  71 +++++++-----
 .../gossip/manager/GossipCoreConstants.java     |  30 ++++++
 .../apache/gossip/manager/GossipManager.java    |   5 +-
 .../gossip/manager/PassiveGossipThread.java     |  27 ++---
 .../java/org/apache/gossip/manager/UdpUtil.java |  45 --------
 .../apache/gossip/manager/DataReaperTest.java   |  26 +++--
 8 files changed, 138 insertions(+), 176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/b2af4490/src/main/java/org/apache/gossip/examples/StandAloneNode.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/examples/StandAloneNode.java b/src/main/java/org/apache/gossip/examples/StandAloneNode.java
index c12f946..d24c0fa 100644
--- a/src/main/java/org/apache/gossip/examples/StandAloneNode.java
+++ b/src/main/java/org/apache/gossip/examples/StandAloneNode.java
@@ -31,7 +31,7 @@ public class StandAloneNode {
     GossipSettings s = new GossipSettings();
     s.setWindowSize(10);
     s.setConvictThreshold(1.0);
-    s.setGossipInterval(10);
+    s.setGossipInterval(1000);
     GossipService gossipService = new GossipService("mycluster",  URI.create(args[0]), args[1],
             Arrays.asList( new RemoteGossipMember("mycluster", URI.create(args[2]), args[3])),
s, (a,b) -> {}, new MetricRegistry());
     gossipService.start();

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/b2af4490/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
index 731b019..f81565b 100644
--- a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
+++ b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
@@ -17,7 +17,6 @@
  */
 package org.apache.gossip.manager;
 
-import java.io.IOException;
 import java.util.List;
 
 import java.util.Map.Entry;
@@ -44,8 +43,6 @@ import org.apache.gossip.udp.UdpGossipDataMessage;
 import org.apache.gossip.udp.UdpSharedGossipDataMessage;
 import org.apache.log4j.Logger;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-
 import static com.codahale.metrics.MetricRegistry.name;
 
 /**
@@ -61,7 +58,6 @@ public class ActiveGossipThread {
   private ScheduledExecutorService scheduledExecutorService;
   private final BlockingQueue<Runnable> workQueue;
   private ThreadPoolExecutor threadService;
-  private ObjectMapper MAPPER = new ObjectMapper();
 
   private final Histogram sharedDataHistogram;
   private final Histogram sendPerNodeDataHistogram;
@@ -114,28 +110,17 @@ public class ActiveGossipThread {
       LOGGER.debug("Send sendMembershipList() is called without action");
       sharedDataHistogram.update(System.currentTimeMillis() - startTime);
       return;
-    }
-    try {
-      for (Entry<String, SharedGossipDataMessage> innerEntry : this.gossipCore.getSharedData().entrySet()){
-          UdpSharedGossipDataMessage message = new UdpSharedGossipDataMessage();
-          message.setUuid(UUID.randomUUID().toString());
-          message.setUriFrom(me.getId());
-          message.setExpireAt(innerEntry.getValue().getExpireAt());
-          message.setKey(innerEntry.getValue().getKey());
-          message.setNodeId(innerEntry.getValue().getNodeId());
-          message.setTimestamp(innerEntry.getValue().getTimestamp());
-          message.setPayload(innerEntry.getValue().getPayload());
-          byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes();
-          int packet_length = json_bytes.length;
-          if (packet_length < GossipManager.MAX_PACKET_SIZE) {
-            gossipCore.sendOneWay(message, member.getUri());
-          } else {
-            LOGGER.error("The length of the to be send message is too large ("
-                    + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
-          }
-      }
-    } catch (IOException e1) {
-      LOGGER.warn(e1);
+    }    
+    for (Entry<String, SharedGossipDataMessage> innerEntry : gossipCore.getSharedData().entrySet()){
+        UdpSharedGossipDataMessage message = new UdpSharedGossipDataMessage();
+        message.setUuid(UUID.randomUUID().toString());
+        message.setUriFrom(me.getId());
+        message.setExpireAt(innerEntry.getValue().getExpireAt());
+        message.setKey(innerEntry.getValue().getKey());
+        message.setNodeId(innerEntry.getValue().getNodeId());
+        message.setTimestamp(innerEntry.getValue().getTimestamp());
+        message.setPayload(innerEntry.getValue().getPayload());
+        gossipCore.sendOneWay(message, member.getUri());
     }
     sharedDataHistogram.update(System.currentTimeMillis() - startTime);
   }
@@ -148,36 +133,26 @@ public class ActiveGossipThread {
       LOGGER.debug("Send sendMembershipList() is called without action");
       sendPerNodeDataHistogram.update(System.currentTimeMillis() - startTime);
       return;
-    }
-    try {
-      for (Entry<String, ConcurrentHashMap<String, GossipDataMessage>> entry
: gossipCore.getPerNodeData().entrySet()){
-        for (Entry<String, GossipDataMessage> innerEntry : entry.getValue().entrySet()){
-          UdpGossipDataMessage message = new UdpGossipDataMessage();
-          message.setUuid(UUID.randomUUID().toString());
-          message.setUriFrom(me.getId());
-          message.setExpireAt(innerEntry.getValue().getExpireAt());
-          message.setKey(innerEntry.getValue().getKey());
-          message.setNodeId(innerEntry.getValue().getNodeId());
-          message.setTimestamp(innerEntry.getValue().getTimestamp());
-          message.setPayload(innerEntry.getValue().getPayload());
-          byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes();
-          int packet_length = json_bytes.length;
-          if (packet_length < GossipManager.MAX_PACKET_SIZE) {
-            gossipCore.sendOneWay(message, member.getUri());
-          } else {
-            LOGGER.error("The length of the to be send message is too large ("
-                    + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
-          }
-        }
+    }    
+    for (Entry<String, ConcurrentHashMap<String, GossipDataMessage>> entry :
gossipCore.getPerNodeData().entrySet()){
+      for (Entry<String, GossipDataMessage> innerEntry : entry.getValue().entrySet()){
+        UdpGossipDataMessage message = new UdpGossipDataMessage();
+        message.setUuid(UUID.randomUUID().toString());
+        message.setUriFrom(me.getId());
+        message.setExpireAt(innerEntry.getValue().getExpireAt());
+        message.setKey(innerEntry.getValue().getKey());
+        message.setNodeId(innerEntry.getValue().getNodeId());
+        message.setTimestamp(innerEntry.getValue().getTimestamp());
+        message.setPayload(innerEntry.getValue().getPayload());
+        gossipCore.sendOneWay(message, member.getUri());   
       }
-    } catch (IOException e1) {
-      LOGGER.warn(e1);
     }
     sendPerNodeDataHistogram.update(System.currentTimeMillis() - startTime);
   }
   
   protected void sendToALiveMember(){
     LocalGossipMember member = selectPartner(gossipManager.getLiveMembers());
+    System.out.println("send" );
     sendMembershipList(gossipManager.getMyself(), member);
   }
   
@@ -199,29 +174,18 @@ public class ActiveGossipThread {
     } else {
       LOGGER.debug("Send sendMembershipList() is called to " + member.toString());
     }
-    try {
-      UdpActiveGossipMessage message = new UdpActiveGossipMessage();
-      message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString());
-      message.setUuid(UUID.randomUUID().toString());
-      message.getMembers().add(convert(me));
-      for (LocalGossipMember other : gossipManager.getMembers().keySet()) {
-        message.getMembers().add(convert(other));
-      }
-      byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes();
-      int packet_length = json_bytes.length;
-      if (packet_length < GossipManager.MAX_PACKET_SIZE) {
-        Response r = gossipCore.send(message, member.getUri());
-        if (r instanceof ActiveGossipOk){
-          //maybe count metrics here
-        } else {
-          LOGGER.debug("Message " + message + " generated response " + r);
-        }
-      } else {
-        LOGGER.error("The length of the to be send message is too large ("
-                + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
-      }
-    } catch (IOException e1) {
-      LOGGER.warn(e1);
+    UdpActiveGossipMessage message = new UdpActiveGossipMessage();
+    message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString());
+    message.setUuid(UUID.randomUUID().toString());
+    message.getMembers().add(convert(me));
+    for (LocalGossipMember other : gossipManager.getMembers().keySet()) {
+      message.getMembers().add(convert(other));
+    }
+    Response r = gossipCore.send(message, member.getUri());
+    if (r instanceof ActiveGossipOk){
+      //maybe count metrics here
+    } else {
+      LOGGER.debug("Message " + message + " generated response " + r);
     }
     sendMembershipHistorgram.update(System.currentTimeMillis() - startTime);
   }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/b2af4490/src/main/java/org/apache/gossip/manager/GossipCore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java
index d315361..5d561c3 100644
--- a/src/main/java/org/apache/gossip/manager/GossipCore.java
+++ b/src/main/java/org/apache/gossip/manager/GossipCore.java
@@ -31,7 +31,6 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -53,27 +52,42 @@ import org.apache.gossip.udp.UdpNotAMemberFault;
 import org.apache.gossip.udp.UdpSharedGossipDataMessage;
 import org.apache.log4j.Logger;
 
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
 import com.fasterxml.jackson.databind.ObjectMapper;
 
 
-public class GossipCore {
+public class GossipCore implements GossipCoreConstants {
   
   public static final Logger LOGGER = Logger.getLogger(GossipCore.class);
   private static final ObjectMapper MAPPER = new ObjectMapper();
   private final GossipManager gossipManager;
   private ConcurrentHashMap<String, Base> requests;
-  private ExecutorService service;
+  private ThreadPoolExecutor service;
   private final ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>>
perNodeData;
   private final ConcurrentHashMap<String, SharedGossipDataMessage> sharedData;
   private final BlockingQueue<Runnable> workQueue;
+  private final Meter messageSerdeException;
+  private final Meter tranmissionException;
+  private final Meter tranmissionSuccess;
   
-  public GossipCore(GossipManager manager){
+  public GossipCore(GossipManager manager, MetricRegistry metrics){
     this.gossipManager = manager;
     requests = new ConcurrentHashMap<>();
     workQueue = new ArrayBlockingQueue<>(1024);
     service = new ThreadPoolExecutor(1, 5, 1, TimeUnit.SECONDS, workQueue, new ThreadPoolExecutor.DiscardOldestPolicy());
     perNodeData = new ConcurrentHashMap<>();
     sharedData = new ConcurrentHashMap<>();
+    metrics.register(WORKQUEUE_SIZE, (Gauge<Integer>)() -> workQueue.size());
+    metrics.register(PER_NODE_DATA_SIZE, (Gauge<Integer>)() -> perNodeData.size());
+    metrics.register(SHARED_DATA_SIZE, (Gauge<Integer>)() ->  sharedData.size());
+    metrics.register(REQUEST_SIZE, (Gauge<Integer>)() ->  requests.size());
+    metrics.register(THREADPOOL_ACTIVE, (Gauge<Integer>)() ->  service.getActiveCount());
+    metrics.register(THREADPOOL_SIZE, (Gauge<Integer>)() ->  service.getPoolSize());
+    messageSerdeException = metrics.meter(MESSAGE_SERDE_EXCEPTION);
+    tranmissionException = metrics.meter(MESSAGE_TRANSMISSION_EXCEPTION);
+    tranmissionSuccess = metrics.meter(MESSAGE_TRANSMISSION_SUCCESS);
   }
   
   public void addSharedData(SharedGossipDataMessage message){
@@ -175,29 +189,29 @@ public class GossipCore {
   }
   
   /**
-   * Sends a blocking  message. Throws exception when tranmission fails 
+   * Sends a blocking message.  
    * @param message
    * @param uri
+   * @throws RuntimeException if data can not be serialized or in transmission error
    */
   private void sendInternal(Base message, URI uri){
     byte[] json_bytes;
     try {
       json_bytes = MAPPER.writeValueAsString(message).getBytes();
     } catch (IOException e) {
+      messageSerdeException.mark();
       throw new RuntimeException(e);
     }
-    int packet_length = json_bytes.length;
-    if (packet_length < GossipManager.MAX_PACKET_SIZE) {
-      byte[] buf = UdpUtil.createBuffer(packet_length, json_bytes);
-      try (DatagramSocket socket = new DatagramSocket()) {
-        socket.setSoTimeout(gossipManager.getSettings().getGossipInterval() * 2);
-        InetAddress dest = InetAddress.getByName(uri.getHost());
-        DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, uri.getPort());
-        socket.send(datagramPacket);
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      } 
-    }
+    try (DatagramSocket socket = new DatagramSocket()) {
+      socket.setSoTimeout(gossipManager.getSettings().getGossipInterval() * 2);
+      InetAddress dest = InetAddress.getByName(uri.getHost());
+      DatagramPacket datagramPacket = new DatagramPacket(json_bytes, json_bytes.length, dest,
uri.getPort());
+      socket.send(datagramPacket);
+      tranmissionSuccess.mark();
+    } catch (IOException e) {
+      tranmissionException.mark();
+      throw new RuntimeException(e);
+    } 
   }
   
   public Response send(Base message, URI uri){
@@ -225,7 +239,7 @@ public class GossipCore {
             return (Response) b;
           }
           try {
-            Thread.sleep(0, 1000);
+            Thread.sleep(0, 555555);
           } catch (InterruptedException e) {
             
           }
@@ -261,19 +275,20 @@ public class GossipCore {
   public void sendOneWay(Base message, URI u){
     byte[] json_bytes;
     try {
-      json_bytes = MAPPER.writeValueAsString(message).getBytes();
+      json_bytes = MAPPER.writeValueAsBytes(message);
     } catch (IOException e) {
+      messageSerdeException.mark();
       throw new RuntimeException(e);
     }
-    int packet_length = json_bytes.length;
-    if (packet_length < GossipManager.MAX_PACKET_SIZE) {
-      byte[] buf = UdpUtil.createBuffer(packet_length, json_bytes);
-      try (DatagramSocket socket = new DatagramSocket()) {
-        socket.setSoTimeout(gossipManager.getSettings().getGossipInterval() * 2);
-        InetAddress dest = InetAddress.getByName(u.getHost());
-        DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, u.getPort());
-        socket.send(datagramPacket);
-      } catch (IOException ex) { }
+    try (DatagramSocket socket = new DatagramSocket()) {
+      socket.setSoTimeout(gossipManager.getSettings().getGossipInterval() * 2);
+      InetAddress dest = InetAddress.getByName(u.getHost());
+      DatagramPacket datagramPacket = new DatagramPacket(json_bytes, json_bytes.length, dest,
u.getPort());
+      socket.send(datagramPacket);
+      tranmissionSuccess.mark();
+    } catch (IOException ex) { 
+      tranmissionException.mark();
+      LOGGER.debug("Send one way failed", ex);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/b2af4490/src/main/java/org/apache/gossip/manager/GossipCoreConstants.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/GossipCoreConstants.java b/src/main/java/org/apache/gossip/manager/GossipCoreConstants.java
new file mode 100644
index 0000000..6d3765a
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/GossipCoreConstants.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager;
+
+public interface GossipCoreConstants {
+  String WORKQUEUE_SIZE = "gossip.core.workqueue.size";
+  String PER_NODE_DATA_SIZE = "gossip.core.pernodedata.size"; 
+  String SHARED_DATA_SIZE = "gossip.core.shareddata.size";
+  String REQUEST_SIZE = "gossip.core.requests.size";
+  String THREADPOOL_ACTIVE = "gossip.core.threadpool.active";
+  String THREADPOOL_SIZE = "gossip.core.threadpool.size";
+  String MESSAGE_SERDE_EXCEPTION = "gossip.core.message_serde_exception";
+  String MESSAGE_TRANSMISSION_EXCEPTION = "gossip.core.message_transmission_exception";
+  String MESSAGE_TRANSMISSION_SUCCESS = "gossip.core.message_transmission_success";
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/b2af4490/src/main/java/org/apache/gossip/manager/GossipManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java
index fb7ec93..cf67c9c 100644
--- a/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -49,8 +49,6 @@ public abstract class GossipManager {
 
   public static final Logger LOGGER = Logger.getLogger(GossipManager.class);
 
-  public static final int MAX_PACKET_SIZE = 102400;
-
   private final ConcurrentSkipListMap<LocalGossipMember, GossipState> members;
 
   private final LocalGossipMember me;
@@ -82,7 +80,7 @@ public abstract class GossipManager {
           List<GossipMember> gossipMembers, GossipListener listener, MetricRegistry
registry) {
     
     this.settings = settings;
-    gossipCore = new GossipCore(this);
+    gossipCore = new GossipCore(this, registry);
     clock = new SystemClock();
     dataReaper = new DataReaper(gossipCore, clock);
     me = new LocalGossipMember(cluster, uri, id, clock.nanoTime(),
@@ -256,4 +254,5 @@ public abstract class GossipManager {
     return dataReaper;
   }
             
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/b2af4490/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
index b54a963..ebda513 100644
--- a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
+++ b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
@@ -78,23 +78,13 @@ abstract public class PassiveGossipThread implements Runnable {
         byte[] buf = new byte[server.getReceiveBufferSize()];
         DatagramPacket p = new DatagramPacket(buf, buf.length);
         server.receive(p);
-        int packet_length = UdpUtil.readPacketLengthFromBuffer(buf);
-        if (packet_length <= GossipManager.MAX_PACKET_SIZE) {
-          byte[] json_bytes = new byte[packet_length];
-          for (int i = 0; i < packet_length; i++) {
-            json_bytes[i] = buf[i + 4];
-          }
-          debug(packet_length, json_bytes);
-          try {
-            Base activeGossipMessage = MAPPER.readValue(json_bytes, Base.class);
-            gossipCore.receive(activeGossipMessage);
-          } catch (RuntimeException ex) {//TODO trap json exception
-            LOGGER.error("Unable to process message", ex);
-          }
-        } else {
-          LOGGER.error("The received message is not of the expected size, it has been dropped.");
+        debug(p.getData());
+        try {
+          Base activeGossipMessage = MAPPER.readValue(p.getData(), Base.class);
+          gossipCore.receive(activeGossipMessage);
+        } catch (RuntimeException ex) {//TODO trap json exception
+          LOGGER.error("Unable to process message", ex);
         }
-
       } catch (IOException e) {
         LOGGER.error(e);
         keepRunning.set(false);
@@ -103,11 +93,10 @@ abstract public class PassiveGossipThread implements Runnable {
     shutdown();
   }
 
-  private void debug(int packetLength, byte[] jsonBytes) {
+  private void debug(byte[] jsonBytes) {
     if (LOGGER.isDebugEnabled()){
       String receivedMessage = new String(jsonBytes);
-      LOGGER.debug("Received message (" + packetLength + " bytes): "
-            + receivedMessage);
+      LOGGER.debug("Received message ( bytes): " + receivedMessage);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/b2af4490/src/main/java/org/apache/gossip/manager/UdpUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/UdpUtil.java b/src/main/java/org/apache/gossip/manager/UdpUtil.java
deleted file mode 100644
index c61769f..0000000
--- a/src/main/java/org/apache/gossip/manager/UdpUtil.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gossip.manager;
-
-import java.nio.ByteBuffer;
-
-public class UdpUtil {
-
-  public static int readPacketLengthFromBuffer(byte [] buffer){
-    int packetLength = 0;
-    for (int i = 0; i < 4; i++) {
-      int shift = (4 - 1 - i) * 8;
-      packetLength += (buffer[i] & 0x000000FF) << shift;
-    }
-    return packetLength;
-  }
-  
-  public static byte[] createBuffer(int packetLength, byte[] jsonBytes) {
-    byte[] lengthBytes = new byte[4];
-    lengthBytes[0] = (byte) (packetLength >> 24);
-    lengthBytes[1] = (byte) ((packetLength << 8) >> 24);
-    lengthBytes[2] = (byte) ((packetLength << 16) >> 24);
-    lengthBytes[3] = (byte) ((packetLength << 24) >> 24);
-    ByteBuffer byteBuffer = ByteBuffer.allocate(4 + jsonBytes.length);
-    byteBuffer.put(lengthBytes);
-    byteBuffer.put(jsonBytes);
-    byte[] buf = byteBuffer.array();
-    return buf;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/b2af4490/src/test/java/org/apache/gossip/manager/DataReaperTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/gossip/manager/DataReaperTest.java b/src/test/java/org/apache/gossip/manager/DataReaperTest.java
index 5388bb3..b4ac45d 100644
--- a/src/test/java/org/apache/gossip/manager/DataReaperTest.java
+++ b/src/test/java/org/apache/gossip/manager/DataReaperTest.java
@@ -32,27 +32,37 @@ import io.teknek.tunit.TUnit;
 public class DataReaperTest {
 
   private final MetricRegistry registry = new MetricRegistry();
-
+  String myId = "4";
+  String key = "key";
+  String value = "a";
+  
   @Test
   public void testReaperOneShot() {
-    String myId = "4";
-    String key = "key";
-    String value = "a";
     GossipSettings settings = new GossipSettings();
     GossipManager gm = RandomGossipManager.newBuilder().cluster("abc").settings(settings)
             .withId(myId).uri(URI.create("udp://localhost:6000")).registry(registry).build();
     gm.init();
     gm.gossipPerNodeData(perNodeDatum(key, value));
     gm.gossipSharedData(sharedDatum(key, value));
-    Assert.assertEquals(value, gm.findPerNodeGossipData(myId, key).getPayload());
-    Assert.assertEquals(value, gm.findSharedGossipData(key).getPayload());
+    assertDataIsAtCorrectValue(gm);
     gm.getDataReaper().runPerNodeOnce();
     gm.getDataReaper().runSharedOnce();
-    TUnit.assertThat(() -> gm.findPerNodeGossipData(myId, key)).equals(null);
-    TUnit.assertThat(() -> gm.findSharedGossipData(key)).equals(null);
+    assertDataIsRemoved(gm);
     gm.shutdown();
   }
 
+  private void assertDataIsAtCorrectValue(GossipManager gm){
+    Assert.assertEquals(value, gm.findPerNodeGossipData(myId, key).getPayload());
+    Assert.assertEquals(1, registry.getGauges().get(GossipCoreConstants.PER_NODE_DATA_SIZE).getValue());
+    Assert.assertEquals(value, gm.findSharedGossipData(key).getPayload());
+    Assert.assertEquals(1, registry.getGauges().get(GossipCoreConstants.SHARED_DATA_SIZE).getValue());
+  }
+  
+  private void assertDataIsRemoved(GossipManager gm){
+    TUnit.assertThat(() -> gm.findPerNodeGossipData(myId, key)).equals(null);
+    TUnit.assertThat(() -> gm.findSharedGossipData(key)).equals(null);
+  }
+  
   private GossipDataMessage perNodeDatum(String key, String value) {
     GossipDataMessage m = new GossipDataMessage();
     m.setExpireAt(System.currentTimeMillis() + 5L);


Mime
View raw message